diff --git a/src/emc/usr_intf/emcrsh.cc b/src/emc/usr_intf/emcrsh.cc index 3ec93814be6..7f22a29faf0 100644 --- a/src/emc/usr_intf/emcrsh.cc +++ b/src/emc/usr_intf/emcrsh.cc @@ -22,6 +22,8 @@ #include #include #include +#include // for sessions counter +#include // parseCommand is not reentrant #include "emcglb.h" // EMC_NMLFILE, TRAJ_MAX_VELOCITY, etc. #include "inifile.hh" // INIFILE @@ -449,7 +451,7 @@ int enabledConn = -1; char pwd[16] = "EMC\0"; char enablePWD[16] = "EMCTOO\0"; char serverName[24] = "EMCNETSVR\0"; -int sessions = 0; +std::atomic_int sessions = 0; int maxSessions = -1; const char *cmdTokens[] = { @@ -479,6 +481,8 @@ struct option longopts[] = { {"path", 1, NULL, 'd'}, {0,0,0,0}}; +std::mutex queue_mtx; // controls access to queue + /* format string to outputbuffer (will be presented to user as result of command) */ #define OUT(...) snprintf(context->outBuf, sizeof(context->outBuf), __VA_ARGS__) @@ -524,7 +528,7 @@ static int initSocket() server_address.sin_addr.s_addr = htonl(INADDR_ANY); server_address.sin_port = htons(port); server_len = sizeof(server_address); - err = bind(server_sockfd, (struct sockaddr *)&server_address, server_len); + err = bind(server_sockfd, reinterpret_cast(&server_address), server_len); if (err) { rcs_print_error("error initializing sockets: %s\n", strerror(errno)); return err; @@ -2917,6 +2921,8 @@ cmdType lookupCommand(char *s) // handle the linuxcncrsh command in context->inBuf int parseCommand(connectionRecType *context) { + std::lock_guard lck(queue_mtx); // Only one thread enters this function at a time. + // Caveat: Change strtok to strtok_r if this ever changes. int ret = 0; char *cmdStr; @@ -3041,15 +3047,9 @@ int sockMain() sessions++; // enforce limited amount of clients that can connect simultaneously if ((maxSessions == -1) || (sessions <= maxSessions)) { - pthread_t *thrd; + pthread_t thrd; connectionRecType *context; - thrd = (pthread_t *)calloc(1, sizeof(pthread_t)); - if (!thrd) { - fprintf(stderr, "linuxcncrsh: out of memory\n"); - exit(1); - } - context = (connectionRecType *)malloc(sizeof(connectionRecType)); if (!context) { fprintf(stderr, "linuxcncrsh: out of memory\n"); @@ -3067,7 +3067,10 @@ int sockMain() context->commProt = 0; context->inBuf[0] = 0; - res = pthread_create(thrd, NULL, readClient, (void *)context); + res = pthread_create(&thrd, NULL, readClient, (void *)context); + if (!res) { + pthread_detach(thrd); + } } else { fprintf(stderr, "linuxcncrsh: maximum amount of sessions exceeded: %d\n", maxSessions); res = -1; diff --git a/src/emc/usr_intf/schedrmt.cc b/src/emc/usr_intf/schedrmt.cc index ebead0298b5..e9da35fc579 100644 --- a/src/emc/usr_intf/schedrmt.cc +++ b/src/emc/usr_intf/schedrmt.cc @@ -1,9 +1,9 @@ /******************************************************************** * Description: schedrmt.cc -* Extended telnet based scheduler interface +* Extended Telnet-based scheduler interface. * -* Derived from a work by Fred Proctor & Will Shackleford -* Further derived from work by jmkasunich +* Derived from a work by Fred Proctor & Will Shackleford. +* Further derived from work by jmkasunich. * * Author: Eric H. Johnson * License: GPL Version 2 @@ -20,6 +20,8 @@ #include #include #include +#include +#include #include #include #include @@ -28,9 +30,8 @@ #include #include #include -#include - -#include +#include // for sessions counter +#include // to control access to queue #include "rcs.hh" #include "posemath.h" // PM_POSE, TO_RAD @@ -252,10 +253,9 @@ typedef struct { char progName[256];} connectionRecType; int port = 5008; -int server_sockfd, client_sockfd; -socklen_t server_len, client_len; +int server_sockfd; +socklen_t server_len; struct sockaddr_in server_address; -struct sockaddr_in client_address; bool useSockets = true; int tokenIdx; const char *delims = " \n\r\0"; @@ -263,7 +263,7 @@ int enabledConn = -1; char pwd[16] = "EMC\0"; char enablePWD[16] = "EMCTOO\0"; char serverName[24] = "EMCNETSVR\0"; -int sessions = 0; +std::atomic_int sessions = 0; int maxSessions = -1; float pollDelay = 1.0; @@ -285,8 +285,9 @@ struct option longopts[] = { {0,0,0,0} }; +std::mutex queue_mtx; // controls access to queue -static void thisQuit() +static void freeAllBuffers() { EMC_NULL emc_null_msg; @@ -312,8 +313,12 @@ static void thisQuit() delete emcCommandBuffer; emcCommandBuffer = 0; } +} - exit(0); +static void thisQuit() +{ + freeAllBuffers(); + _exit(0); } static int initSockets() @@ -323,12 +328,13 @@ static int initSockets() server_address.sin_addr.s_addr = htonl(INADDR_ANY); server_address.sin_port = htons(port); server_len = sizeof(server_address); - bind(server_sockfd, (struct sockaddr *)&server_address, server_len); + bind(server_sockfd, reinterpret_cast(&server_address), server_len); listen(server_sockfd, 5); signal(SIGCHLD, SIG_IGN); return 0; } +// attached to signal SIGINT in main() static void sigQuit(int /*sig*/) { thisQuit(); @@ -375,8 +381,8 @@ static int commandHello(connectionRecType *context) static int checkOnOff(char *s) { - static const char *onStr = "ON"; - static const char *offStr = "OFF"; + static const char onStr[] = "ON"; + static const char offStr[] = "OFF"; if (s == NULL) return -1; strupr(s); @@ -387,8 +393,8 @@ static int checkOnOff(char *s) static int checkBinaryASCII(char *s) { - static const char *binaryStr = "BINARY"; - static const char *ASCIIStr = "ASCII"; + static const char binaryStr[] = "BINARY"; + static const char ASCIIStr[] = "ASCII"; if (s == NULL) return -1; strupr(s); @@ -399,10 +405,10 @@ static int checkBinaryASCII(char *s) static queueStatusType checkMode(char *s) { - static const char *runStr = "RUN"; - static const char *stopStr = "STOP"; - static const char *pauseStr = "PAUSE"; - static const char *resumeStr = "RESUME"; + static const char runStr[] = "RUN"; + static const char stopStr[] = "STOP"; + static const char pauseStr[] = "PAUSE"; + static const char resumeStr[] = "RESUME"; if (s == NULL) return qsError; strupr(s); @@ -479,9 +485,7 @@ static cmdResponseType setCommMode(char *s, connectionRecType *context) static cmdResponseType setCommProt(char * /*s*/, connectionRecType *context) { - char *pVersion; - - pVersion = strtok(NULL, delims); + const char *pVersion = strtok(NULL, delims); if (pVersion == NULL) return rtStandardError; rtapi_strxcpy(context->version, pVersion); return rtNoError; @@ -491,7 +495,7 @@ static cmdResponseType setQMode(char *s, connectionRecType * /*context*/) { queueStatusType st; - st = checkMode(s); + st = checkMode(s); // transcribes s to all caps switch (st) { case qsStop: queueStop(); break; case qsRun: queueStart(); break; @@ -625,9 +629,9 @@ static cmdResponseType setPollRate(char *s, connectionRecType * /*context*/) int commandSet(connectionRecType *context) { - static const char *setNakStr = "SET NAK\n\r"; - static const char *setCmdNakStr = "SET %s NAK\n\r"; - static const char *ackStr = "SET %s ACK\n\r"; + static const char setNakStr[] = "SET NAK\n\r"; + static const char setCmdNakStr[] = "SET %s NAK\n\r"; + static const char ackStr[] = "SET %s ACK\n\r"; setCommandType cmd; char *pch; cmdResponseType ret = rtNoError; @@ -690,7 +694,7 @@ int commandSet(connectionRecType *context) static cmdResponseType getEcho(char * /*s*/, connectionRecType *context) { - const char *pEchoStr = "ECHO %s"; + const char pEchoStr[] = "ECHO %s"; if (context->echo) snprintf(context->outBuf, sizeof(context->outBuf), pEchoStr, "ON"); else snprintf(context->outBuf, sizeof(context->outBuf), pEchoStr, "OFF"); @@ -699,7 +703,7 @@ static cmdResponseType getEcho(char * /*s*/, connectionRecType *context) static cmdResponseType getVerbose(char * /*s*/, connectionRecType *context) { - const char *pVerboseStr = "VERBOSE %s"; + const char pVerboseStr[] = "VERBOSE %s"; if (context->verbose) snprintf(context->outBuf, sizeof(context->outBuf), pVerboseStr, "ON"); else snprintf(context->outBuf, sizeof(context->outBuf), pVerboseStr, "OFF"); @@ -708,7 +712,7 @@ static cmdResponseType getVerbose(char * /*s*/, connectionRecType *context) static cmdResponseType getEnable(char * /*s*/, connectionRecType *context) { - const char *pEnableStr = "ENABLE %s"; + const char pEnableStr[] = "ENABLE %s"; if (context->cliSock == enabledConn) // if (context->enabled == true) @@ -719,7 +723,7 @@ static cmdResponseType getEnable(char * /*s*/, connectionRecType *context) static cmdResponseType getConfig(char * /*s*/, connectionRecType *context) { - const char *pConfigStr = "CONFIG"; + const char pConfigStr[] = "CONFIG"; rtapi_strxcpy(context->outBuf, pConfigStr); return rtNoError; @@ -727,7 +731,7 @@ static cmdResponseType getConfig(char * /*s*/, connectionRecType *context) static cmdResponseType getCommMode(char * /*s*/, connectionRecType *context) { - const char *pCommModeStr = "COMM_MODE %s"; + const char pCommModeStr[] = "COMM_MODE %s"; switch (context->commMode) { case 0: snprintf(context->outBuf, sizeof(context->outBuf), pCommModeStr, "ASCII"); break; @@ -738,7 +742,7 @@ static cmdResponseType getCommMode(char * /*s*/, connectionRecType *context) static cmdResponseType getCommProt(char * /*s*/, connectionRecType *context) { - const char *pCommProtStr = "COMM_PROT %s"; + const char pCommProtStr[] = "COMM_PROT %s"; snprintf(context->outBuf, sizeof(context->outBuf), pCommProtStr, context->version); return rtNoError; @@ -746,7 +750,7 @@ static cmdResponseType getCommProt(char * /*s*/, connectionRecType *context) static cmdResponseType getDebug(char * /*s*/, connectionRecType *context) { - const char *pUpdateStr = "DEBUG %d"; + const char pUpdateStr[] = "DEBUG %d"; snprintf(context->outBuf, sizeof(context->outBuf), pUpdateStr, emcStatus->debug); return rtNoError; @@ -754,7 +758,7 @@ static cmdResponseType getDebug(char * /*s*/, connectionRecType *context) static cmdResponseType getIniFile(char * /*s*/, connectionRecType *context) { - const char *pIniFile = "INIFILE %s"; + const char pIniFile[] = "INIFILE %s"; snprintf(context->outBuf, sizeof(context->outBuf), pIniFile, emc_inifile); return rtNoError; @@ -762,7 +766,7 @@ static cmdResponseType getIniFile(char * /*s*/, connectionRecType *context) static cmdResponseType getPlat(char * /*s*/, connectionRecType *context) { - const char *pPlatStr = "PLAT %s"; + const char pPlatStr[] = "PLAT %s"; snprintf(context->outBuf, sizeof(context->outBuf), pPlatStr, "Linux"); return rtNoError; @@ -770,7 +774,7 @@ static cmdResponseType getPlat(char * /*s*/, connectionRecType *context) static cmdResponseType getQMode(char * /*s*/, connectionRecType *context) { - const char *pQMode = "QMODE %s"; + const char pQMode[] = "QMODE %s"; switch (getStatus()) { case qsStop: snprintf(context->outBuf, sizeof(context->outBuf), pQMode, "STOP"); break; @@ -784,7 +788,7 @@ static cmdResponseType getQMode(char * /*s*/, connectionRecType *context) static cmdResponseType getQStatus(connectionRecType *context) { - const char *pQStatus = "QSTATUS %d %d %d %d"; + const char pQStatus[] = "QSTATUS %d %d %d %d"; snprintf(context->outBuf, sizeof(context->outBuf), pQStatus, getQueueSize(), getFirstTagId(), getLastTagId(), getQueueCRC()); return rtNoError; @@ -792,7 +796,7 @@ static cmdResponseType getQStatus(connectionRecType *context) static cmdResponseType getTagId(connectionRecType *context) { - const char *pTagId = "AUTOTAGID %d"; + const char pTagId[] = "AUTOTAGID %d"; snprintf(context->outBuf, sizeof(context->outBuf), pTagId, getNextTagId()); return rtNoError; @@ -873,8 +877,8 @@ static cmdResponseType getPollRate(connectionRecType *context) int commandGet(connectionRecType *context) { - static const char *setNakStr = "GET NAK\r\n"; - static const char *setCmdNakStr = "GET %s NAK\r\n"; + static const char setNakStr[] = "GET NAK\r\n"; + static const char setCmdNakStr[] = "GET %s NAK\r\n"; setCommandType cmd; char *pch; cmdResponseType ret = rtNoError; @@ -942,7 +946,7 @@ int commandShutdown(connectionRecType *context) if (context->cliSock == enabledConn) { printf("Shutting down\n"); thisQuit(); - return -1; + return -1; // not reached } else return 0; @@ -1033,7 +1037,7 @@ static int helpQuit(connectionRecType *context) snprintf(context->outBuf, sizeof(context->outBuf), "Usage:\n\r"); rtapi_strxcat(context->outBuf, " The quit command has the server initiate a disconnect from the client,\n\r"); rtapi_strxcat(context->outBuf, " the command has no parameters and no requirements to have negotiated\n\r"); - rtapi_strxcat(context->outBuf, " a hello, or be in control."); + rtapi_strxcat(context->outBuf, " a hello, or to be in control."); sockWrite(context); return 0; } @@ -1079,7 +1083,9 @@ commandTokenType lookupToken(char *s) int temp; while (i < cmdUnknown) { - if (strcmp(commands[i], s) == 0) return i; + if (strcmp(commands[i], s) == 0) { + return i; + } // (int)i += 1; temp = i; temp++; @@ -1090,16 +1096,19 @@ commandTokenType lookupToken(char *s) int parseCommand(connectionRecType *context) { + std::lock_guard lck(queue_mtx); // Only one thread enters this function at a time. + // Caveat: Change strtok to strtok_r if this ever changes. int ret = 0; char *pch; char s[64]; - static const char *helloNakStr = "HELLO NAK\r\n"; - static const char *shutdownNakStr = "SHUTDOWN NAK\r\n"; - static const char *helloAckStr = "HELLO ACK %s 1.1\r\n"; - static const char *setNakStr = "SET NAK\r\n"; + static const char helloNakStr[] = "HELLO NAK\r\n"; + static const char shutdownNakStr[] = "SHUTDOWN NAK\r\n"; + static const char helloAckStr[] = "HELLO ACK %s 1.1\r\n"; + static const char setNakStr[] = "SET NAK\r\n"; pch = strtok(context->inBuf, delims); snprintf(s, sizeof(s), helloAckStr, serverName); + if (pch != NULL) { strupr(pch); switch (lookupToken(pch)) { @@ -1137,91 +1146,100 @@ int parseCommand(connectionRecType *context) void *checkQueue(void * /*arg*/) { while (1) { - updateQueue(); - sleep((unsigned)pollDelay); + { + std::lock_guard lck(queue_mtx); + updateQueue(); + // the mutex shall be destroyed right after the update, + // before the sleep. } + sleep((unsigned)pollDelay); + } return 0; } -void *readClient(void * /*arg*/) +void *readClient(void *arg) { + connectionRecType *context = (connectionRecType *)arg; + char str[1600]; char buf[1600]; - unsigned int i, j; - int len; - connectionRecType *context; - - -// res = 1; - context = (connectionRecType *) malloc(sizeof(connectionRecType)); - context->cliSock = client_sockfd; - context->linked = false; - context->echo = true; - context->verbose = false; - rtapi_strxcpy(context->version, "1.0"); - rtapi_strxcpy(context->hostName, "Default"); - context->enabled = false; - context->commMode = 0; - context->commProt = 0; - context->inBuf[0] = 0; - buf[0] = 0; - - while (1) { - len = read(context->cliSock, &str, 1600); + int res = 0; + + do { + int len = read(context->cliSock, &str, sizeof(str)-1); if (len <= 0) goto finished; str[len] = 0; + buf[0] = 0; rtapi_strxcat(buf, str); - if (!memchr(str, 0x0d, strlen(str))) continue; - if (context->echo && context->linked) + if (!memchr(str, '\r', len)) continue; + if (context->echo && context->linked) { if(write(context->cliSock, buf, strlen(buf)) != (ssize_t)strlen(buf)) { fprintf(stderr, "emcrsh: write() failed: %s", strerror(errno)); } - i = 0; - j = 0; - while (i <= strlen(buf)) { + } + // Iterate over lines in buffer and parse commands + size_t buflen = strlen(buf); // avoid multiple execution of strlen within loop + for (unsigned int i=0,j=0; i <= buflen; i++) { if ((buf[i] != '\n') && (buf[i] != '\r')) { + if (j>=sizeof(context->inBuf) - 1) { + goto finished; + } context->inBuf[j] = buf[i]; j++; } else if (j > 0) { context->inBuf[j] = 0; - if (parseCommand(context) == -1) goto finished; + res = parseCommand(context); + if (-1 == res) break; j = 0; } - i++; } - buf[0] = 0; - } + } while ( -1 != res ); finished: close(context->cliSock); free(context); - pthread_exit((void *)0); - sessions--; // FIXME: not reached + --sessions; // std::atomic_int to avoid race condition + return(NULL); // equivalent to pthread_exit() } -int sockMain() +void sockMain() { - pthread_t thrd; - int res; - while (1) { - - client_len = sizeof(client_address); - client_sockfd = accept(server_sockfd, - (struct sockaddr *)&client_address, &client_len); - if (client_sockfd < 0) exit(0); - sessions++; - if ((maxSessions == -1) || (sessions <= maxSessions)) - res = pthread_create(&thrd, NULL, readClient, (void *)NULL); - else res = -1; - if (res != 0) { - close(client_sockfd); - sessions--; + int res = -1; + struct sockaddr_in client_address; + socklen_t client_len = sizeof(client_address); + int client_sockfd = accept(server_sockfd, reinterpret_cast(&client_address), &client_len); + if (client_sockfd < 0) { + perror("sockMain: accept failed\n"); + exit(EXIT_FAILURE); } - } - return 0; + ++sessions; + if ((maxSessions == -1) || (sessions <= maxSessions)) { + connectionRecType *context = (connectionRecType *)calloc(1, sizeof(connectionRecType)); + if (context) { + // Initialize necessary context fields, rest are set to zero in calloc + context->cliSock = client_sockfd; + rtapi_strxcpy(context->hostName, "Default"); + rtapi_strxcpy(context->version, "1.0"); + context->echo = true; + pthread_t thrd; + res = pthread_create(&thrd, NULL, readClient, context); + if (res != 0) { + // error upon thread creation + free(context); + } else if (pthread_detach(thrd)) { + // no errno set by pthread_detach + rcs_print_error("sockMain: error by pthread_detach - ignored\n"); + } + } + } + if (res != 0) { + close(client_sockfd); + --sessions; + } + } } static void initMain() @@ -1246,12 +1264,9 @@ static void initMain() int main(int argc, char *argv[]) { - int opt; - pthread_t updateThread; - int res; - initMain(); // process local command line args + int opt; while((opt = getopt_long(argc, argv, "e:n:p:s:w:", longopts, NULL)) != -1) { switch(opt) { case 'e': snprintf(enablePWD, sizeof(enablePWD), "%s", optarg); break; @@ -1260,22 +1275,25 @@ int main(int argc, char *argv[]) case 's': sscanf(optarg, "%d", &maxSessions); break; case 'w': snprintf(pwd, sizeof(pwd), "%s", optarg); break; case 'd': snprintf(defaultPath, sizeof(defaultPath), "%s", optarg); break; + default: + fprintf(stderr,"Usage: %s [ -e enable-PWD ] [ -n server-name ] [ -p port ] [ -s max-sessions ] [ -w pwd ] [ -d default-path ]\n", argv[0]); + exit(EXIT_FAILURE); } } // process emc command line args if (emcGetArgs(argc, argv) != 0) { - rcs_print_error("error in argument list\n"); - exit(1); + rcs_print_error("Error in argument list\n"); + exit(EXIT_FAILURE); } // get configuration information iniLoad(emc_inifile); initSockets(); // init NML if (tryNml() != 0) { - rcs_print_error("can't connect to emc\n"); - thisQuit(); - exit(1); + rcs_print_error("Cannot connect to EMC\n"); + freeAllBuffers(); + exit(EXIT_FAILURE); } // get current serial number, and save it for restoring when we quit // so as not to interfere with real operator interface @@ -1286,9 +1304,18 @@ int main(int argc, char *argv[]) signal(SIGINT, sigQuit); schedInit(); - res = pthread_create(&updateThread, NULL, checkQueue, (void *)NULL); - if(res != 0) { perror("pthread_create"); return 1; } + pthread_t updateThread; + int res = pthread_create(&updateThread, NULL, checkQueue, (void *)NULL); + + if(res != 0) { + perror("pthread_create"); + freeAllBuffers(); + return EXIT_FAILURE; + } else { + pthread_detach(updateThread); + } if (useSockets) sockMain(); - return 0; + freeAllBuffers(); + return EXIT_SUCCESS; }