00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #define EXPORT_BORLAND
00030
00031 #ifdef IS_DLL
00032 #define EXPORT_MSCPP __declspec(dllexport)
00033 #else
00034 #define EXPORT_MSCPP
00035 #endif
00036
00037 #define OAA_BINARY_PORT_OFFSET 10
00038
00039
00040
00041
00042 #ifndef _WINDOWS
00043 #include <sys/param.h>
00044 #endif
00045 #include <stdio.h>
00046 #include <stdarg.h>
00047 #include <stdlib.h>
00048 #include <string.h>
00049 #include <errno.h>
00050 #include <math.h>
00051 #include "libcom_tcp.h"
00052 #include "libdb.h"
00053 #include "libutils.h"
00054 #include "libicl_private.h"
00055 #include "libicl.h"
00056 #include "libicl_depr.h"
00057 #include "termreader.h"
00058 #include "stringtermreader.h"
00059 #include "binarytermreader.h"
00060 #include "termsender.h"
00061 #include "stringtermsender.h"
00062 #include "binarytermsender.h"
00063 #include "liboaa.h"
00064
00065 extern const char* oaa_library_version_str;
00066
00067
00068
00069
00070 #ifndef STREQ
00071 #define STREQ(str1, str2) (strcmp((str1), (str2)) == 0)
00072 #endif
00073
00074 int COM_BEST_FORMAT = 100;
00075 int COM_STRING_FORMAT = 1;
00076 int COM_BINARY_FORMAT = 2;
00077
00078 #ifdef NORMAL_GC
00079 static void* termReaders[128];
00080 static void* termSenders[128];
00081 static int termReadersIdx = -1;
00082 static int termSendersIdx = -1;
00083
00084
00085
00086
00087 #endif
00088
00089
00090 ICLDatabase *commdb = NULL;
00091
00097 static ICLTerm *eventBuffer = NULL;
00098
00104 static int stringListener = -1;
00105
00111 static int binaryListener = -1;
00112
00118 static int stringPort = -1;
00119
00125 static int binaryPort = -1;
00126
00127 static int nextStartingSeqNum = 0;
00128
00129 static const int HEARTBEAT_TIMEOUT = 10;
00130
00131
00132 int com_ask_about_tcp_exception(int Port, char *Host);
00133 void tcpShutdown(SOCKET connection);
00134 SOCKET tcpConnect(char *hostName, int port);
00135
00136 int tcpSelect(SOCKET socket, double timeout);
00137 EXTERN void comPrintError(char *str, ...);
00138
00139
00140 static void checkEventBuffer(void);
00141 static void com_accept_new_connection(int listenersocket);
00142 static int com_add_event_to_buffer(ICLTerm *event);
00143 static int com_disconnect_all_connection_ids();
00144 static int com_find_all_connected_sockets(ICLTerm *ids, ICLTerm **sockets);
00145 static int com_find_all_connection_ids(ICLTerm **ids);
00146 static char *com_get_connection_id_from_socket(int socket);
00147 static char *com_get_localhost_ip_address(void);
00148 static int com_is_binary_listener(int socket);
00149 static int com_is_listening(void);
00150 static int com_read_events_into_buffer(double timeout);
00151 static int com_select_event_from_buffer(ICLTerm **event);
00152 static int com_select_event_from_socket(int socketNumber, double timeout, ICLTerm **event);
00153 static char* new_direct_client_id(void);
00154 static int opentcpport(int port);
00155 static int tcpListenAtPort(int port);
00156 static struct hostent* getHostEntry(char const* hostName);
00157
00158 static int close_socket(int socket);
00159
00160 #ifdef _WINDOWS
00161
00162
00163
00164 #include <windows.h>
00165 #include <winsock.h>
00166 int tcp_started = 0;
00167 int EXPORT_MSCPP tcpStartup();
00168 #include "oaa-windows.h"
00169 #else
00170
00171
00172
00173 #include <stdio.h>
00174
00175 #include <sys/types.h>
00176 #include <sys/socket.h>
00177 #include <netinet/in.h>
00178 #include <arpa/inet.h>
00179 #include <netdb.h>
00180 #include <strings.h>
00181 #include <unistd.h>
00182 #include <sys/time.h>
00183 #include <string.h>
00184
00185 #endif
00186
00187 static int close_socket(int socket)
00188 {
00189 #ifdef _WINDOWS
00190 return closesocket(socket);
00191 #else
00192 return close(socket);
00193 #endif
00194 }
00195
00196 static int getNextStartingSeqnum()
00197 {
00198 int result;
00199 if(nextStartingSeqNum == 0) {
00200 srand(1023);
00201 }
00202 result = rand();
00203 return (result < 0 ? -result : result) | 2;
00204 }
00205
00217 EXPORT_MSCPP
00218 SOCKET EXPORT_BORLAND
00219 com_Connect(char *ConnectionId, ICLTerm *Address)
00220 {
00221 return comConnectFormat(ConnectionId, ICL_EMPTY, Address, COM_BEST_FORMAT);
00222 }
00223
00224 char const* commonStart = "event(ev_connect([";
00225 char const* commonEnd = "query_formats(_),other_name('libcom_tcp_formatRequest_shouldDisconnect'),other_type(client)]),[])";
00226 char const* otherVersionStart = "other_version(";
00227 char const* otherVersionEnd = ")";
00228 char const* passwordStart = "password(";
00229 char const* passwordEnd = ")";
00230 char const* comma = ",";
00231
00232 static ICLTerm* createEvConnectEvent(char const* password, ICLTerm* hostTerm)
00233 {
00234 char* tempString;
00235 char* host = icl_NewStringFromTerm(hostTerm);
00236 ICLTerm* result;
00237 int tempStringLen =
00238 strlen(commonStart) +
00239 strlen(otherVersionStart) + strlen(oaa_library_version_str) + strlen(otherVersionEnd) + strlen(comma) +
00240 (password ? strlen(passwordStart) + strlen(password) + strlen(passwordEnd) + strlen(comma) : 0) +
00241 strlen(commonEnd) + 1;
00242
00243 tempString = (char*)malloc(tempStringLen);
00244 snprintf(tempString, tempStringLen, "%s%s%s%s%s%s%s%s%s%s",
00245 commonStart,
00246 otherVersionStart, oaa_library_version_str, otherVersionEnd, comma,
00247 (password ? passwordStart : ""), (password ? password : ""), (password ? passwordEnd : ""), (password ? comma : ""),
00248 commonEnd);
00249 result = icl_NewTermFromString(tempString);
00250 free(tempString);
00251 icl_stFree(host);
00252 return result;
00253 }
00254
00255 static void com_set_connect_info(char* connectionId, ICLTerm* params, ICLTerm* host, ICLTerm* port, ICLTerm* readerInfo, ICLTerm* senderInfo, SOCKET connection)
00256 {
00257 ICLTerm *canonicalOtherAddress = NULL;
00258 ICLTerm *newConnectionInfo = NULL;
00259 ICLTerm *tempList = icl_NewList(NULL);
00260 ICLTerm *atmp;
00261 ICLTerm *templateTcpAddr = icl_NewStruct("tcp", 2, icl_CopyTerm(host), icl_CopyTerm(port));
00262 ICLTerm *typeMatcher = icl_NewStruct("type", 1, icl_NewVar("_"));
00263
00264 icl_AddToList(tempList, readerInfo, FALSE);
00265 icl_AddToList(tempList, senderInfo, FALSE);
00266
00267
00268 if(!com_GetInfo(connectionId, typeMatcher, NULL)) {
00269 int startingSequenceNumber = getNextStartingSeqnum();
00270 ICLTerm* cHost;
00271 ICLTerm* cPort;
00272 icl_AddToList(tempList, icl_NewStruct("connection", 1, icl_NewInt(connection)),FALSE);
00273 icl_AddToList(tempList, icl_NewStruct("initiated", 1, icl_NewStr("true")), FALSE);
00274 icl_AddToList(tempList, icl_NewStruct("sequence", 1, icl_NewInt(startingSequenceNumber)), FALSE);
00275 icl_AddToList(tempList, icl_NewStruct("other_acked", 1, icl_NewInt(startingSequenceNumber - 1)), FALSE);
00276
00277
00278
00279 {
00280 ICLTerm *tmp1;
00281 ICLTerm *tmp2;
00282 tmp1 = icl_NewStruct("other_address", 1, icl_NewVar("_"));
00283 if (icl_GetParamValue(tmp1, params, &tmp2)) {
00284 icl_AddToList(tempList, tmp2, TRUE);
00285 }
00286 else {
00287 cnx_CanonicalAddress(templateTcpAddr, connection, &canonicalOtherAddress);
00288 icl_AddToList(tempList, icl_NewStruct("other_address", 1, canonicalOtherAddress), FALSE);
00289 }
00290 icl_Free(tmp1);
00291 }
00292
00293 cHost = icl_CopyTerm(host);
00294 cPort = icl_CopyTerm(port);
00295 atmp = icl_NewStruct("tcp", 2, cHost, cPort);
00296
00297 newConnectionInfo = icl_NewStruct("com_connection_info", 5,
00298 icl_NewStr(connectionId),
00299 atmp,
00300
00301 icl_NewStr("client"),
00302 tempList,
00303 icl_NewStr("connected"));
00304
00305 {
00306 char* format = get64BitFormatWrapped("libcom_tcp com_Connect() Connected to %s ", ".");
00307 printf(format, icl_Str(host), icl_Int(port));
00308 printf("\n");
00309 free(format);
00310 }
00311
00312 if (newConnectionInfo) {
00313 db_Assert(commdb, newConnectionInfo, ICL_EMPTY);
00314
00315
00316 icl_Free(newConnectionInfo);
00317 }
00318 else {
00319 fprintf(stderr, "Error! %s:%i com_connection_info could not be constructed.\n", __FILE__, __LINE__);
00320 }
00321 }
00322 else {
00323 icl_AddToList(tempList, icl_NewStruct("connection", 1, icl_NewInt(connection)),FALSE);
00324
00325
00326
00327 {
00328 ICLTerm *tmp1;
00329 ICLTerm *tmp2;
00330 tmp1 = icl_NewStruct("other_address", 1, icl_NewVar("_"));
00331 if (icl_GetParamValue(tmp1, params, &tmp2)) {
00332 icl_AddToList(tempList, tmp2, TRUE);
00333 }
00334 else {
00335 cnx_CanonicalAddress(templateTcpAddr, connection, &canonicalOtherAddress);
00336 icl_AddToList(tempList, icl_NewStruct("other_address", 1, canonicalOtherAddress), FALSE);
00337 }
00338 icl_Free(tmp1);
00339 }
00340
00341 com_UpdateInfo(connectionId, icl_NewStruct("protocol", 1, icl_NewStruct("tcp", 2, icl_CopyTerm(host), icl_CopyTerm(port))));
00342 com_UpdateInfo(connectionId, icl_NewStruct("type", 1, icl_NewStr("client")));
00343 com_UpdateInfo(connectionId, icl_NewStruct("status", 1, icl_NewStr("connected")));
00344 com_UpdateInfo(connectionId, tempList);
00345 {
00346 char* format = get64BitFormatWrapped("libcom_tcp com_Connect() Connected to %s ", ".");
00347 printf(format, icl_Str(host), icl_Int(port));
00348 printf("\n");
00349 free(format);
00350 }
00351 }
00352 icl_Free(templateTcpAddr);
00353 icl_Free(typeMatcher);
00354 }
00355
00356 EXPORT_MSCPP
00357 SOCKET EXPORT_BORLAND
00358 comConnectFormat(char* ConnectionId, ICLTerm* Params,
00359 ICLTerm* Address, int format)
00360 {
00361 ICLTerm *connectionInfo = NULL, *host = NULL, *port = NULL;
00362 ICLTerm *t1;
00363 SOCKET connection = 0;
00364 SOCKET oldConnection = 0;
00365 ICLTerm *readerInfo = NULL;
00366 ICLTerm *senderInfo = NULL;
00367 int res;
00368 ICLTerm* usePassword = NULL;
00369 ICLTerm* passwordTerm = NULL;
00370 char* password = NULL;
00371
00372 #ifdef _WINDOWS
00373 if (!tcp_started)
00374 tcp_started = tcpStartup();
00375 #endif
00376
00377
00378
00379
00380
00381 if (!commdb) {
00382 commdb = db_NewDB();
00383 }
00384
00385 t1 = icl_NewStruct("tcp", 2, icl_NewVar("Host"), icl_NewVar("Port"));
00386 res = icl_Unify(Address, t1 ,&connectionInfo);
00387 if (ConnectionId && *ConnectionId && res) {
00388
00389
00390 host = icl_CopyTerm(icl_NthTerm(connectionInfo, 1));
00391 port = icl_CopyTerm(icl_NthTerm(connectionInfo, 2));
00392 icl_Free(connectionInfo);
00393
00394
00395 if (icl_IsVar(host) || icl_IsVar(port)) {
00396 ICLTerm* resolved_var = NULL;
00397
00398 oaa_ResolveVariable("default_facilitator", &resolved_var);
00399 if (icl_IsVar(host)) {
00400 icl_Free(host);
00401 host = icl_CopyTerm(icl_NthTerm(resolved_var, 1));
00402 }
00403 if (icl_IsVar(port)) {
00404 icl_Free(port);
00405 port = icl_CopyTerm(icl_NthTerm(resolved_var, 2));
00406 }
00407 icl_Free(resolved_var);
00408 }
00409
00410
00411 if (host && port && !icl_IsVar(host) && !icl_IsVar(port))
00412 connection = tcpConnect(icl_Str(host), icl_Int(port));
00413
00414 oaa_ResolveVariable("use_password", &usePassword);
00415
00416 if(usePassword != NULL) {
00417 if(icl_IsStr(usePassword) &&
00418 (strlen(icl_Str(usePassword)) == 4) &&
00419 (strncmp(icl_Str(usePassword), "true", 4) == 0)) {
00420 oaa_ResolveVariable("client_password", &passwordTerm);
00421 if(passwordTerm != NULL) {
00422 password = icl_NewStringFromTerm(passwordTerm);
00423 }
00424 else {
00425 password = strdup("_");
00426 }
00427 }
00428 }
00429
00430 if(connection > 0) {
00431
00432 TermReader* initialTr = termReader_create();
00433 StringTermReader* str;
00434 TermSender* initialTs = termSender_create();
00435 StringTermSender* sts;
00436 ICLTerm* whatFormats = NULL;
00437 ICLTerm* reply = NULL;
00438 ICLTerm* resultList = NULL;
00439 gint64 realPort = icl_Int(port);
00440 TermReader* realTr = NULL;
00441 TermSender* realTs = NULL;
00442 char* readerPtr;
00443 char* readerInfoBuf;
00444 char* senderPtr;
00445 char* senderInfoBuf;
00446 whatFormats = createEvConnectEvent(password, host);
00447 if(!whatFormats) {
00448 printf("NULL whatFormats\n");
00449 }
00450 icl_stFree(password);
00451 password = NULL;
00452
00453 str = stringTermReader_create(initialTr, connection);
00454 sts = stringTermSender_create(initialTs, connection);
00455
00456 termSender_sendTerm(initialTs, whatFormats);
00457 icl_Free(whatFormats);
00458
00459
00460
00461 if(termSender_getError(initialTs) != TERMSENDER_OKAY) {
00462 fprintf(stderr, "libcom_tcp com_Connect could not send formatRequest message: %i\n", termSender_getError(initialTs));
00463 close_socket(termSender_getSocket(initialTs));
00464 termSender_free(initialTs);
00465 termReader_free(initialTr);
00466 goto failedconnect;
00467 }
00468 reply = NULL;
00469 do {
00470
00471
00472
00473 if(reply != NULL) {
00474 icl_Free(reply);
00475 }
00476 reply = termReader_getNextTerm(initialTr, 0);
00477
00478
00479
00480 } while(termReader_getError(initialTr) == TERMREADER_TIMEOUT);
00481
00482 if(termReader_getError(initialTr) != TERMREADER_OKAY) {
00483 fprintf(stderr, "libcom_tcp com_Connect could not read next term: %i\n", termReader_getError(initialTr));
00484 close_socket(termReader_getSocket(initialTr));
00485 termReader_free(initialTr);
00486 termSender_free(initialTs);
00487 goto failedconnect;
00488 }
00489
00490 if(icl_IsStruct(icl_NthTerm(icl_NthTerm(icl_NthTerm(reply, 1), 1), 1))) {
00491 ICLTerm* resStruct = icl_NthTerm(icl_NthTerm(icl_NthTerm(reply, 1), 1), 1);
00492 char* functor = icl_Functor(resStruct);
00493 if(functor != NULL) {
00494 if((strlen(functor) == 9) &&
00495 (strncmp(functor, "exception", 9) == 0)) {
00496 char* exceptionData = icl_NewStringFromTerm(icl_NthTerm(resStruct, 1));
00497 fprintf(stderr, "Exception in comConnectFormat: %s\n", exceptionData);
00498 icl_stFree(exceptionData);
00499 goto failedconnect;
00500 }
00501 }
00502 }
00503
00504
00505
00506
00507
00508
00509 oldConnection = connection;
00510 resultList = icl_NthTerm(icl_NthTerm(icl_NthTerm(reply, 1), 1), 1);
00511 if(((format == COM_BEST_FORMAT) || (format == COM_BINARY_FORMAT)) &&
00512 (icl_ParamValueAsInt("binary", resultList, &realPort) == TRUE)) {
00513 BinaryTermReader* btr;
00514 BinaryTermSender* bts;
00515
00516
00517 printf("libcom_tcp com_Connect() using binary format\n");
00518
00519 connection = tcpConnect(icl_Str(host), (int)realPort);
00520 realTr = termReader_create();
00521 btr = binaryTermReader_create(realTr, connection);
00522 realTs = termSender_create();
00523 bts = binaryTermSender_create(realTs, connection);
00524 }
00525 else {
00526 StringTermReader* str;
00527 StringTermSender* sts;
00528
00529
00530
00531
00532 connection = tcpConnect(icl_Str(host), (int)icl_Int(port));
00533 realTr = termReader_create();
00534 str = stringTermReader_create(realTr, connection);
00535 #ifdef NORMAL_GC
00536 printf("created str (%p) from realTr(%p)\n", str, realTr);
00537 #endif
00538 CHECK_LEAKS();
00539 realTs = termSender_create();
00540 sts = stringTermSender_create(realTs, connection);
00541 CHECK_LEAKS();
00542 }
00543 CHECK_LEAKS();
00544
00545 icl_Free(reply);
00546 CHECK_LEAKS();
00547
00548 close_socket(oldConnection);
00549 termReader_free(initialTr);
00550 termSender_free(initialTs);
00551 CHECK_LEAKS();
00552 #ifdef NORMAL_GC
00553
00554
00555
00556
00557
00558
00559
00560 termReaders[++termReadersIdx] = realTr;
00561 CHECK_LEAKS();
00562 #endif
00563 CHECK_LEAKS();
00564 readerPtr = pointerToString(realTr);
00565 readerInfoBuf = (char*)malloc(strlen(readerPtr) + 10 + 1);
00566 CHECK_LEAKS();
00567 strncpy(readerInfoBuf, "reader('", 8);
00568 strncpy(readerInfoBuf + 8, readerPtr, strlen(readerPtr));
00569 readerInfoBuf[strlen(readerPtr) + 8] = '\'';
00570 readerInfoBuf[strlen(readerPtr) + 9] = ')';
00571 readerInfoBuf[strlen(readerPtr) + 10] = '\0';
00572 CHECK_LEAKS();
00573 readerInfo = icl_NewTermFromString(readerInfoBuf);
00574 free(readerPtr);
00575 free(readerInfoBuf);
00576 senderPtr = pointerToString(realTs);
00577 #ifdef NORMAL_GC
00578
00579
00580
00581
00582
00583
00584
00585 termSenders[++termSendersIdx] = realTs;
00586 #endif
00587 senderInfoBuf = (char*)malloc(strlen(senderPtr) + 10 + 1);
00588 strncpy(senderInfoBuf, "sender('", 8);
00589 strncpy(senderInfoBuf + 8, senderPtr, strlen(senderPtr));
00590 senderInfoBuf[strlen(senderPtr) + 8] = '\'';
00591 senderInfoBuf[strlen(senderPtr) + 9] = ')';
00592 senderInfoBuf[strlen(senderPtr) + 10] = '\0';
00593 senderInfo = icl_NewTermFromString(senderInfoBuf);
00594 free(senderPtr);
00595 free(senderInfoBuf);
00596 }
00597
00598 if (connection>0) {
00599 com_set_connect_info(ConnectionId, Params, host, port, readerInfo, senderInfo, connection);
00600 }
00601 else {
00602 char* format = get64BitFormatWrapped("com_Connect: failed to connect to %s ", ".");
00603 fprintf(stderr, format, icl_Str(host), icl_Int(port));
00604 fprintf(stderr, "\n");
00605 free(format);
00606 }
00607 if (!host && !port)
00608 fprintf(stderr, "Make sure you have a valid setup.pl file\n");
00609 }
00610 else {
00611 char* addressString = icl_NewStringFromTerm(Address);
00612 fprintf(stderr, "%s Error! Bad arguments to com_Connect(): connectionId = %s address = %s\n", __FILE__, ConnectionId, addressString);
00613 icl_stFree(addressString);
00614 }
00615
00616 icl_Free(t1);
00617 #ifdef NORMAL_GC
00618 printf("Leaving comConnectFormat\n");
00619 #endif
00620
00621 if(port) icl_Free(port);
00622 if(host) icl_Free(host);
00623 return connection;
00624
00625 failedconnect:
00626 if(port) icl_Free(port);
00627 if(host) icl_Free(host);
00628 return -1;
00629 }
00630
00637 EXPORT_MSCPP
00638 int EXPORT_BORLAND
00639 com_Disconnect(char *ConnectionId)
00640 {
00641 ICLTerm *t1, *info = NULL;
00642
00643 if (ConnectionId == NULL) {
00644 return com_disconnect_all_connection_ids();
00645 }
00646
00647 if (ConnectionId && *ConnectionId && commdb) {
00648 if (com_GetInfo(ConnectionId,
00649 (t1 = icl_NewStruct("connection", 1,
00650 icl_NewVar("_"))), &info)) {
00651
00652 tcpShutdown(icl_Int(icl_NthTerm(info, 1)));
00653 icl_Free(info);
00654 icl_Free(t1);
00655 db_Retract(commdb,
00656 (t1 = icl_NewStruct("com_connection_info", 5,
00657 icl_NewStr(ConnectionId),
00658 icl_NewVar("TCP"),
00659 icl_NewVar("_Type"),
00660 icl_NewVar("_Info"),
00661 icl_NewStr("connected"))),
00662 ICL_EMPTY);
00663 }
00664 icl_Free(t1);
00665 return TRUE;
00666 }
00667 else return FALSE;
00668 }
00669
00673 EXPORT_MSCPP
00674 SOCKET EXPORT_BORLAND
00675 com_Connected(char* ConnectionId)
00676 {
00677 ICLTerm *t1;
00678
00679
00680 if (ConnectionId && *ConnectionId) {
00681 ICLTerm *currentStatus = NULL;
00682 com_GetInfo(ConnectionId,
00683 (t1 = icl_NewStruct("status", 1,
00684 icl_NewStr("connected"))),¤tStatus);
00685 icl_Free(t1);
00686 if (currentStatus && STREQ(icl_Str(currentStatus),"connected")) {
00687 icl_Free(currentStatus);
00688 return TRUE;
00689 }
00690 }
00691 return FALSE;
00692 }
00693
00694
00695
00696
00697
00698
00699 EXPORT_MSCPP
00700 int EXPORT_BORLAND
00701 com_GetAllValidConnections(ICLTerm** result)
00702 {
00703 int index;
00704 ICLTerm *resultFromDb = (ICLTerm *)NULL;
00705 static ICLTerm* info = NULL;
00706
00707 if (!icl_IsValid(info)) {
00708 info = icl_NewTermFromData("com_connection_info(Id,Protocol,Type,InfoList,connected)",56);
00709 }
00710
00711 *result = icl_NewList(NULL);
00712
00713 if (db_Solve(commdb, info, ICL_EMPTY, &resultFromDb)) {
00714
00715
00716
00717
00718
00719
00720 ICLTerm *item = NULL;
00721
00722
00723
00724
00725
00726
00727 for (index = 1; index <= icl_ListLen(resultFromDb) ; index ++) {
00728 item = icl_NthTerm(resultFromDb, index);
00729 icl_AddToList(*result, icl_CopyTerm(icl_NthTerm(item, 1)), TRUE);
00730 }
00731 icl_Free(resultFromDb);
00732 return TRUE;
00733 }
00734 icl_Free(resultFromDb);
00735 return FALSE;
00736 };
00737
00742 int com_GetConnectionFromInfo(ICLTerm *Info, char** connectionName)
00743 {
00744 ICLTerm *resultFromDb = (ICLTerm *)NULL;
00745 int res = FALSE;
00746 static ICLTerm* info = NULL;
00747
00748 if (!icl_IsValid(info)) {
00749 info = icl_NewTermFromData("com_connection_info(Conn,Protocol,Type,InfoList,Status)",55);
00750 }
00751
00752 (void)Info;
00753
00754 if (db_Solve(commdb, info, ICL_EMPTY, &resultFromDb)) {
00755 *connectionName =
00756 strdup(icl_Str(icl_NthTerm(icl_NthTerm(resultFromDb, 1),1)));
00757 res = TRUE;
00758 icl_Free(resultFromDb);
00759 }
00760 return res;
00761 }
00762
00779 EXPORT_MSCPP
00780 int EXPORT_BORLAND
00781 com_ListenAt(char *ConnectionId, ICLTerm *Params, ICLTerm *Address)
00782 {
00783 ICLTerm *connectionInfo = NULL, *host = NULL, *port = NULL;
00784 ICLTerm *info = NULL, *t1 = NULL;
00785 ICLTerm *resolve_vars_false = icl_NewTermFromData("resolve_vars(false)",19);
00786 int do_resolve_vars = TRUE;
00787 char cmd[600];
00788 char *addressString = NULL;
00789 char *hostString = NULL;
00790 int res = FALSE;
00791 ICLTerm* addrFromCmdLine = NULL;
00792 ICLTerm* addrFromSetup = NULL;
00793 ICLTerm* addrFromEnv = NULL;
00794
00795
00796 if (!commdb)
00797 commdb = db_NewDB();
00798
00799 t1 = icl_NewStruct("tcp", 2, icl_NewVar("Host"), icl_NewVar("Port"));
00800 if (ConnectionId && *ConnectionId && icl_Unify(Address, t1, &connectionInfo)) {
00801
00802
00803 host = icl_NthTerm(connectionInfo, 1);
00804 port = icl_NthTerm(connectionInfo, 2);
00805
00806
00807 do_resolve_vars = !icl_GetParamValue(resolve_vars_false, Params, NULL);
00808 icl_Free(resolve_vars_false);
00809 if (do_resolve_vars && (icl_IsVar(host) || icl_IsVar(port))) {
00810 oaa_ResolveVariable("oaa_connect", &addrFromSetup);
00811 Address = addrFromSetup;
00812 addressString = getenv("OAA_CONNECT");
00813 if (addressString) {
00814 icl_Free(Address);
00815 addrFromEnv = icl_NewTermFromString(addressString);
00816 Address = addrFromEnv;
00817 }
00818 if(oaa_ResolveVariable("-oaa_connect", &addrFromCmdLine)) {
00819 icl_Free(Address);
00820 Address = addrFromCmdLine;
00821 }
00822 }
00823
00824 icl_Free(connectionInfo);
00825
00826
00827 icl_Unify(Address, t1, &connectionInfo);
00828
00829
00830 host = icl_NthTerm(connectionInfo, 1);
00831 port = icl_NthTerm(connectionInfo, 2);
00832
00833 if (!icl_IsVar(host) && !icl_IsVar(port))
00834
00835 do {
00836
00837 hostString = strdup(icl_Str(host));
00838 if (!strcmp("localhost", hostString)) {
00839 icl_stFree(hostString);
00840 hostString = com_get_localhost_ip_address();
00841 }
00842
00843
00844 stringPort = icl_Int(port);
00845 binaryPort = stringPort + OAA_BINARY_PORT_OFFSET;
00846 stringListener = tcpListenAtPort(stringPort);
00847 binaryListener = tcpListenAtPort(binaryPort);
00848 if ( (stringListener > 0) && (binaryListener > 0) ) {
00849 sprintf(cmd, "com_connection_info('%s', tcp, server, \
00850 [oaa_address(addr(tcp('%s',%d))), oaa_host('%s'), oaa_port(%d)], \
00851 connected)",
00852 ConnectionId, hostString, stringPort,
00853 hostString, stringPort);
00854 info = icl_NewTermFromString(cmd);
00855
00856 if (info) {
00857 db_Assert(commdb, info, ICL_EMPTY);
00858 icl_Free(info);
00859 }
00860 else {
00861 printf("Error! %s:%i com_connection_info could not be constructed.\n", __FILE__, __LINE__);
00862 }
00863 res = TRUE;
00864 }
00865 else {
00866 if (!com_ask_about_tcp_exception(icl_Int(port),
00867 icl_NewStringFromTerm(host))) {
00868 icl_Free(connectionInfo);
00869 icl_Free(t1);
00870 icl_stFree(hostString);
00871 return FALSE;
00872 }
00873 }
00874 } while (!res);
00875 icl_Free(connectionInfo);
00876 }
00877 else {
00878 printf("Error! Bad arguments to com_ListenAt()\n");
00879 db_Assert(commdb, info, ICL_EMPTY);
00880 }
00881 icl_Free(t1);
00882 icl_stFree(hostString);
00883 return res;
00884 }
00885
00886
00887
00888
00889
00890 void
00891 com_print_tcp_exception_help() {
00892 #ifdef _WINDOWS
00893 #else
00894 printf("I've just attempted to listen on the specified port, but was unable\n");
00895 printf("to gain control of it. This could be because there's already a\n");
00896 printf("Facilitator, or some other program, making use of that port. Or, it\n");
00897 printf("could be that a Facilitator using that port was just terminated. In\n");
00898 printf("such cases, the port may be inaccessible for a brief period (usually\n");
00899 printf("only a few seconds, but sometimes more). It may help to kill any\n");
00900 printf("client agents which may still be connected to the defunct Facilitator.\n");
00901
00902 printf("If you think the specified port may now be accessible, enter \"y\" and\n");
00903 printf("I'll try again. You may request retry any number of times.\n");
00904
00905 printf("If you want me to listen on a different port, enter \"n\", which will\n");
00906 printf("cause me to terminate. Then change your port specification (it's\n");
00907 printf("either in a setup file or an environment variable). Then restart me.\n");
00908
00909 #endif
00910 }
00911
00912 int
00913 com_ask_about_tcp_exception(int Port, char *Host) {
00914
00915 char response[100];
00916
00917 do {
00918 printf("Currently unable to access %s port %d.\n Try again? [y)es, n)o, h(elp] ", Host, Port);
00919
00920 scanf("%s\n", (char *)(&response));
00921
00922 if (strcmp(response,"y") == 0)
00923 return 1;
00924 else
00925
00926 if (strcmp(response, "n") == 0)
00927 return 0;
00928 else
00929
00930 if (strcmp(response, "h") == 0)
00931 com_print_tcp_exception_help();
00932 } while (TRUE);
00933 }
00934
00939 int com_get_socket_from_connection_id(char* ConnectionId) {
00940 ICLTerm *info = NULL;
00941 int socketNumber = -1;
00942
00943
00944
00945
00946
00947
00948
00949 ICLTerm* requestTerm = icl_NewStruct("connection", 1, icl_NewVar("_"));
00950 if (ConnectionId && *ConnectionId && commdb &&
00951 com_GetInfo(ConnectionId, requestTerm, &info)) {
00952 icl_NthTermAsInt(info, 1, &socketNumber);
00953 icl_Free(info);
00954 }
00955 icl_Free(requestTerm);
00956
00957 return socketNumber;
00958 }
00959
00964 int basic_send(SOCKET inSocket, char *Data, int Size)
00965 {
00966 int res = FALSE;
00967
00968
00969 if (inSocket>0) {
00970 #ifdef _WINDOWS
00971 if (send(inSocket, Data, Size,0) == SOCKET_ERROR)
00972 res = -1;
00973 #else
00974 res = (write(inSocket, Data, Size) == Size);
00975 #endif
00976 }
00977 return res;
00978 }
00979
00984 EXPORT_MSCPP
00985 int EXPORT_BORLAND
00986 com_SendData(char *ConnectionId, char *Data, int Size)
00987 {
00988 int res = FALSE;
00989 int socketNumber = com_get_socket_from_connection_id(ConnectionId);
00990
00991
00992 if (socketNumber>0) {
00993 char *buffer = (char*)malloc(Size+8);
00994 sprintf(buffer,"term(%s).", Data);
00995
00996
00997
00998
00999
01000
01001 basic_send(socketNumber, buffer, strlen(buffer));
01002 buffer[0] = 10;
01003 basic_send(socketNumber, buffer, 1);
01004
01005 free(buffer);
01006 }
01007 return res;
01008 }
01009
01010 EXPORT_MSCPP
01011 int EXPORT_BORLAND
01012 com_SendTerm(char* connectionId, ICLTerm* term)
01013 {
01014 ICLTerm* result;
01015 ICLTerm* query = icl_NewStruct("sender", 1, icl_NewVar("_"));
01016 char* pointerStr;
01017 TermSender* ts;
01018 if(!com_GetInfo(connectionId, query, &result)) {
01019 icl_Free(query);
01020 return FALSE;
01021 }
01022
01023 pointerStr = icl_Str(icl_NthTerm(result, 1));
01024 pointerStr = strdup(pointerStr);
01025 icl_stRemoveQuotes(pointerStr);
01026 ts = (TermSender*)stringToPointer(pointerStr);
01027 free(pointerStr);
01028 termSender_sendTerm(ts, term);
01029 if(termSender_getError(ts) != TERMSENDER_OKAY) {
01030 fprintf(stderr, "com_SendTerm(char*, ICLTerm*) did not send term\n");
01031 icl_Free(query);
01032 icl_Free(result);
01033 return FALSE;
01034 }
01035 icl_Free(query);
01036 icl_Free(result);
01037 return TRUE;
01038 }
01039
01046 EXPORT_MSCPP
01047 int EXPORT_BORLAND
01048 com_AddInfo(char *ConnectionId, ICLTerm *NewInfo)
01049 {
01050 ICLTerm *info = NULL, *result = NULL;
01051 char cmd[200];
01052 int changed = FALSE;
01053 int res = FALSE;
01054
01055
01056 if (!ConnectionId || !*ConnectionId || !commdb)
01057 return FALSE;
01058
01059 sprintf(cmd, "com_connection_info('%s', Protocol, Type,InfoList,Status)",
01060 ConnectionId);
01061 info = icl_NewTermFromString(cmd);
01062
01063 if (db_Solve(commdb, info, ICL_EMPTY, &result)) {
01064 ICLTerm *ninfo = NULL, *item;
01065 ICLTerm *protocol, *type, *infolist, *status;
01066 ICLTerm* templateProtocol = icl_NewStruct("protocol", 1, icl_NewVar("_"));
01067 ICLTerm* templateStatus = icl_NewStruct("status", 1, icl_NewVar("_"));
01068 ICLTerm* templateType = icl_NewStruct("type", 1, icl_NewVar("_"));
01069
01070 db_Retract(commdb, info, ICL_EMPTY);
01071 item = icl_NthTerm(result, 1);
01072
01073 if (icl_Unify(templateProtocol, NewInfo, NULL)) {
01074 protocol = icl_NthTerm(NewInfo, 1);
01075 changed = TRUE;
01076 }
01077 else protocol = icl_NthTerm(item, 2);
01078
01079 if (icl_Unify(templateType, NewInfo, NULL)) {
01080 type = icl_NthTerm(NewInfo, 1);
01081 changed = TRUE;
01082 }
01083 else type = icl_NthTerm(item, 3);
01084
01085 if (icl_Unify(templateStatus, NewInfo, NULL)) {
01086 status = icl_NthTerm(NewInfo, 1);
01087 changed = TRUE;
01088 }
01089 else status = icl_NthTerm(item, 5);
01090
01091 if (!changed) {
01092 if (icl_IsList(NewInfo)) {
01093 ICLListType *p = icl_List(NewInfo);
01094 while (p) {
01095 icl_AddToList(icl_NthTerm(item, 4), icl_CopyTerm(p->elt), TRUE);
01096 p = p->next;
01097 }
01098 }
01099 else
01100 icl_AddToList(icl_NthTerm(item, 4), icl_CopyTerm(NewInfo), TRUE);
01101 }
01102 infolist = icl_NthTerm(item, 4);
01103
01104 ninfo = icl_NewStruct("com_connection_info", 5, icl_NewStr(ConnectionId),
01105 icl_CopyTerm(protocol), icl_CopyTerm(type),
01106 icl_CopyTerm(infolist), icl_CopyTerm(status));
01107
01108 db_Assert(commdb, ninfo, ICL_EMPTY);
01109
01110 icl_Free(ninfo);
01111 icl_Free(result);
01112 icl_Free(templateProtocol);
01113 icl_Free(templateStatus);
01114 icl_Free(templateType);
01115 res = TRUE;
01116 }
01117 else {
01118 res = FALSE;
01119 }
01120 icl_Free(info);
01121 return res;
01122 }
01123
01128 EXPORT_MSCPP
01129 int EXPORT_BORLAND
01130 com_RemoveInfo(char *connectionId, ICLTerm *toMatch)
01131 {
01132 ICLTerm *info = NULL, *result = NULL;
01133 char cmd[200];
01134 int res = FALSE;
01135
01136
01137 if (!connectionId || !*connectionId || !commdb)
01138 return FALSE;
01139 sprintf(cmd, "com_connection_info('%s', Protocol, Type,InfoList,Status)",
01140 connectionId);
01141 info = icl_NewTermFromString(cmd);
01142
01143 if (db_Solve(commdb, info, ICL_EMPTY, &result)) {
01144 ICLTerm *ninfo = NULL, *item;
01145 ICLTerm *protocol, *existingList, *type, *status;
01146
01147 db_Retract(commdb, info, ICL_EMPTY);
01148 item = icl_NthTerm(result, 1);
01149
01150 protocol = icl_NthTerm(item, 2);
01151 type = icl_NthTerm(item, 3);
01152 status = icl_NthTerm(item, 5);
01153
01154 existingList = icl_NthTerm(item, 4);
01155 if (icl_IsList(toMatch)) {
01156 ICLListType *p;
01157 for(p = icl_List(toMatch); p; p = p->next) {
01158 ICLTerm* unifier = icl_GenerateSimpleUnifyingTerm(p->elt);
01159 icl_RemoveUnifyingFromList(existingList, unifier, TRUE);
01160 icl_FreeTerm(unifier);
01161 }
01162 }
01163 else {
01164 ICLTerm* unifier = icl_GenerateSimpleUnifyingTerm(toMatch);
01165 icl_RemoveUnifyingFromList(existingList, unifier, TRUE);
01166 icl_FreeTerm(unifier);
01167 }
01168
01169 ninfo = icl_NewStruct("com_connection_info", 5, icl_NewStr(connectionId),
01170 icl_CopyTerm(protocol), icl_CopyTerm(type),
01171 icl_CopyTerm(existingList), icl_CopyTerm(status));
01172
01173 db_Assert(commdb, ninfo, ICL_EMPTY);
01174
01175 icl_Free(ninfo);
01176 icl_Free(result);
01177 res = TRUE;
01178 }
01179 else {
01180 res = FALSE;
01181 }
01182 icl_Free(info);
01183 return res;
01184
01185 }
01186
01193 EXPORT_MSCPP
01194 int EXPORT_BORLAND
01195 com_UpdateInfo(char *ConnectionId, ICLTerm *NewInfo)
01196 {
01197 ICLTerm *info = NULL, *result = NULL;
01198 char cmd[200];
01199 int changed = FALSE;
01200 int res = FALSE;
01201
01202
01203 if (!ConnectionId || !*ConnectionId || !commdb)
01204 return FALSE;
01205 sprintf(cmd, "com_connection_info('%s', Protocol, Type,InfoList,Status)",
01206 ConnectionId);
01207 info = icl_NewTermFromString(cmd);
01208
01209 if (db_Solve(commdb, info, ICL_EMPTY, &result)) {
01210 ICLTerm *ninfo = NULL, *item;
01211 ICLTerm *protocol, *existingList, *type, *status;
01212 ICLTerm* templateProtocol = icl_NewStruct("protocol", 1, icl_NewVar("_"));
01213 ICLTerm* templateStatus = icl_NewStruct("status", 1, icl_NewVar("_"));
01214 ICLTerm* templateType = icl_NewStruct("type", 1, icl_NewVar("_"));
01215
01216 db_Retract(commdb, info, ICL_EMPTY);
01217 item = icl_NthTerm(result, 1);
01218
01219 if (icl_Unify(templateProtocol, NewInfo, NULL)) {
01220 protocol = icl_NthTerm(NewInfo, 1);
01221 changed = TRUE;
01222 }
01223 else protocol = icl_NthTerm(item, 2);
01224
01225 if (icl_Unify(templateType, NewInfo, NULL)) {
01226 type = icl_NthTerm(NewInfo, 1);
01227 changed = TRUE;
01228 }
01229 else type = icl_NthTerm(item, 3);
01230
01231 if (icl_Unify(templateStatus, NewInfo, NULL)) {
01232 status = icl_NthTerm(NewInfo, 1);
01233 changed = TRUE;
01234 }
01235 else status = icl_NthTerm(item, 5);
01236
01237 existingList = icl_NthTerm(item, 4);
01238 if (!changed) {
01239 if (icl_IsList(NewInfo)) {
01240 ICLListType *p;
01241 for(p = icl_List(NewInfo); p; p = p->next) {
01242 ICLTerm* unifier = icl_GenerateSimpleUnifyingTerm(p->elt);
01243 int numReplaced = icl_ReplaceUnifying(existingList, unifier, p->elt, TRUE);
01244
01245 if(numReplaced < 1) {
01246 icl_AddToList(existingList, icl_CopyTerm(p->elt), TRUE);
01247 }
01248 icl_FreeTerm(unifier);
01249 }
01250 }
01251 else {
01252 ICLTerm* unifier = icl_GenerateSimpleUnifyingTerm(NewInfo);
01253 int numReplaced = icl_ReplaceUnifying(existingList, unifier, NewInfo, TRUE);
01254 if(numReplaced < 1) {
01255 icl_AddToList(existingList, icl_CopyTerm(NewInfo), TRUE);
01256 }
01257 icl_FreeTerm(unifier);
01258 }
01259 }
01260
01261 ninfo = icl_NewStruct("com_connection_info", 5, icl_NewStr(ConnectionId),
01262 icl_CopyTerm(protocol), icl_CopyTerm(type),
01263 icl_CopyTerm(existingList), icl_CopyTerm(status));
01264
01265 db_Assert(commdb, ninfo, ICL_EMPTY);
01266
01267 icl_Free(ninfo);
01268 icl_Free(result);
01269 icl_Free(templateProtocol);
01270 icl_Free(templateStatus);
01271 icl_Free(templateType);
01272 res = TRUE;
01273 }
01274 else {
01275 res = FALSE;
01276 }
01277 icl_Free(info);
01278 return res;
01279 }
01280
01281 static void com_getMatchingInfos(ICLTerm const*info, ICLTerm **result)
01282 {
01283 ICLTerm* allMatcher = icl_NewStruct("com_connection_info", 5, icl_NewVar("_"), icl_NewVar("_"), icl_NewVar("_"), icl_NewVar("_"), icl_NewVar("_"));
01284 ICLTerm* allMatches = NULL;
01285 *result = icl_NewList(NULL);
01286 if(db_Solve(commdb, allMatcher, ICL_EMPTY, &allMatches)) {
01287
01288
01289 ICLListType* listElem;
01290 for(listElem = icl_List(allMatches); listElem; listElem = icl_ListNext(listElem)) {
01291 if(icl_Member(info, icl_NthTerm(icl_ListElt(listElem), 4), NULL)) {
01292 icl_AddToList(*result, icl_CopyTerm(icl_ListElt(listElem)), FALSE);
01293 }
01294 }
01295 icl_Free(allMatches);
01296 }
01297 icl_Free(allMatcher);
01298 }
01299
01304 EXPORT_MSCPP
01305 int EXPORT_BORLAND
01306 com_GetInfo(char *ConnectionId, ICLTerm *GInfo, ICLTerm **Result)
01307 {
01308 ICLTerm *info = NULL, *result = NULL;
01309 char cmd[200];
01310 int res = FALSE;
01311
01312
01313
01314
01315
01316
01317 if (!ConnectionId || !*ConnectionId || !commdb)
01318 return FALSE;
01319
01320 sprintf(cmd, "com_connection_info(%s, Protocol, Type,InfoList,Status)",
01321 ConnectionId);
01322 info = icl_NewTermFromString(cmd);
01323
01324 if (db_Solve(commdb, info, ICL_EMPTY, &result)) {
01325
01326 ICLTerm *item = NULL;
01327 ICLTerm* templateProtocol = icl_NewStruct("protocol", 1, icl_NewVar("_"));
01328 ICLTerm* templateStatus = icl_NewStruct("status", 1, icl_NewVar("_"));
01329 ICLTerm* templateType = icl_NewStruct("type", 1, icl_NewVar("_"));
01330
01331
01332
01333
01334 res = TRUE;
01335 item = icl_NthTerm(result, 1);
01336
01337 if (icl_Unify(templateProtocol, GInfo, NULL)) {
01338 if (Result!=NULL) {
01339 *Result = icl_CopyTerm(icl_NthTerm(item, 2));
01340 }
01341 res = TRUE;
01342 }
01343 else if (icl_Unify(templateStatus, GInfo, NULL)){
01344 if (Result!=NULL) {
01345 *Result = icl_CopyTerm(icl_NthTerm(item, 5));
01346 }
01347 res = TRUE;
01348 }
01349 else if (icl_Unify(templateType, GInfo, NULL)){
01350 if (Result!=NULL) {
01351 *Result = icl_CopyTerm(icl_NthTerm(item, 3));
01352 }
01353 res = TRUE;
01354 }
01355 else if (Result!=NULL) {
01356 ICLTerm* tempList = icl_NthTerm(item, 4);
01357
01358 res = icl_Member(GInfo, tempList, Result);
01359 }
01360
01361 icl_Free(templateProtocol);
01362 icl_Free(templateStatus);
01363 icl_Free(templateType);
01364 icl_Free(result);
01365 }
01366 else {
01367 res = FALSE;
01368 }
01369 icl_Free(info);
01370
01371 return res;
01372 }
01373
01374
01379 EXPORT_MSCPP
01380 int EXPORT_BORLAND
01381 com_GetConnectionId(char **ConnectionId, ICLTerm *GInfo)
01382 {
01383 static ICLTerm *info = NULL;
01384 ICLTerm *result = NULL;
01385 int res = FALSE;
01386
01387 if (!icl_IsValid(info)) {
01388 info = icl_NewTermFromData("com_connection_info(ConnectionId,Protocol,Type,InfoList,Status)",63);
01389 }
01390
01391
01392
01393
01394 if (db_Solve(commdb, info, ICL_EMPTY, &result)) {
01395
01396 ICLListType *reslist = icl_List(result);
01397
01398 while(reslist != NULL) {
01399 ICLTerm *current = reslist->elt;
01400 ICLTerm *infolist = icl_NthTerm(current, 4);
01401 reslist = reslist->next;
01402
01403
01404
01405
01406 if(icl_ParamValue(icl_Functor(GInfo), icl_NthTerm(GInfo, 1),
01407 infolist, NULL)) {
01408 *ConnectionId = strdup(icl_Str(icl_NthTerm(current, 1)));
01409 res = TRUE;
01410 break;
01411 }
01412 }
01413 icl_Free(result);
01414
01415 }
01416 return res;
01417 }
01418
01419
01420
01421
01422
01423
01424
01425
01426 EXPORT_MSCPP
01427 int EXPORT_BORLAND
01428 com_SelectEvent(char* inConnectionId, double timeout, ICLTerm **event){
01429 ICLTerm* readerTerm;
01430 ICLTerm* query;
01431 TermReader* reader;
01432 char* readerString;
01433
01434 query = icl_NewTermFromData("reader(_)",9);
01435 if(!com_GetInfo(inConnectionId, query, &readerTerm)) {
01436 icl_Free(query);
01437 fprintf(stderr, "com_SelectEvent could not find reader\n");
01438 return FALSE;
01439 }
01440 readerString = icl_Str(icl_NthTerm(readerTerm, 1));
01441 readerString = strdup(readerString);
01442 icl_stRemoveQuotes(readerString);
01443 reader = (TermReader*)stringToPointer(readerString);
01444 free(readerString);
01445 *event = termReader_getNextTerm(reader, timeout);
01446 icl_Free(query);
01447 icl_Free(readerTerm);
01448 if(*event == NULL) {
01449 int te = termReader_getError(reader);
01450 fprintf(stderr, "com_SelectEvent error in reading term: %i\n", te);
01451 return FALSE;
01452 }
01453 else {
01454 return TRUE;
01455 }
01456 }
01457
01461 EXTERN void comPrintError(char *str, ...) {
01462 char buf[512];
01463 va_list ptr;
01464 va_start(ptr,str);
01465 vsprintf(buf,str,ptr);
01466 printf("COM ERROR : %s\n", buf);
01467 va_end(ptr);
01468 }
01469
01470
01471
01472
01473
01474 #ifdef _WINDOWS
01475
01476 int EXPORT_MSCPP tcpStartup()
01477 {
01478 WORD wVersionRequested;
01479 WSADATA wsaData;
01480
01481
01482 wVersionRequested=MAKEWORD(1,1);
01483
01484 if (WSAStartup(wVersionRequested,&wsaData)!=0)
01485 return (0);
01486
01487 if (LOBYTE(wsaData.wVersion)!=1 ||
01488 HIBYTE(wsaData.wVersion)!=1) {
01489 WSACleanup();
01490 return(0);
01491 }
01492
01493 return(1);
01494 }
01495
01496 void EXPORT_MSCPP tcpTestDLL(void)
01497 {
01498 MessageBox(NULL, "Testing, one ,two, ...", "TCP Dll", MB_OK);
01499 }
01500
01501 #endif
01502
01503
01504
01505
01506
01511 void tcpShutdown(SOCKET socket) {
01512 shutdown(socket,2);
01513 #ifdef _WINDOWS
01514 shutdown(socket, 0);
01515 #else
01516 close(socket);
01517 #endif
01518 }
01519
01523 struct hostent* getHostEntry(char const* hostName)
01524 {
01525 struct hostent* he = gethostbyname(hostName);
01526 u_long addr;
01527 if (!he) {
01528 addr = inet_addr(hostName);
01529 if ((int)addr != -1) {
01530 he = gethostbyaddr((char *)&addr, sizeof(u_long), AF_INET);
01531 }
01532 }
01533
01534 return he;
01535 }
01536
01543 SOCKET tcpConnect(char *hostName, int port)
01544 {
01545 SOCKET s;
01546 int ret;
01547 struct sockaddr_in saddr;
01548 struct hostent *he;
01549
01550 he = getHostEntry(hostName);
01551 if (!he) {
01552 printf("TCP - Could not resolve host: %s\n", hostName);
01553 return -1;
01554 }
01555
01556 ret = socket(AF_INET, SOCK_STREAM, 0);
01557
01558 if (ret < 0) {
01559 printf("TCP - Could not create socket.\n");
01560 return -1;
01561 }
01562 else s = ret;
01563
01564
01565 #ifndef _WINDOWS
01566 memset((char*)&saddr, 0, sizeof(saddr));
01567 memcpy((char*)&saddr.sin_addr.s_addr, he->h_addr, he->h_length);
01568 memset(&(saddr.sin_zero), '\0', 8);
01569 #else
01570 memcpy((char FAR *) &saddr.sin_addr, (char FAR *)he->h_addr, he->h_length);
01571 #endif
01572
01573 saddr.sin_family = AF_INET;
01574 saddr.sin_port = htons(port);
01575
01576 if (connect(s, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
01577 printf("TCP - Could not connect to %s #%d\n", hostName,port);
01578 perror("Reason");
01579 tcpShutdown(s);
01580 return -1;
01581 }
01582
01583 return s;
01584 }
01585
01586
01593 int tcpSelect(SOCKET socket, double timeout) {
01594
01595 struct timeval time, *timep = NULL;
01596 #ifndef _WINDOWS
01597 struct timezone timezone;
01598 #endif
01599
01600 int width;
01601 fd_set readfds;
01602
01603 if (timeout > 0) {
01604 #ifndef _WINDOWS
01605 if(gettimeofday(&time, &timezone) == -1)
01606 printf("TCP - error getting time of data\n");
01607 else {
01608 #endif
01609 long i;
01610 i = (long)timeout;
01611 time.tv_sec = i;
01612 time.tv_usec = 0.5 + (timeout - i) * 1e6;
01613 timep = &time;
01614 #ifndef _WINDOWS
01615 }
01616 #endif
01617 }
01618
01619 width = socket + 1;
01620 FD_ZERO(&readfds);
01621 FD_SET(socket, &readfds);
01622
01623 return select(width, &readfds, NULL, NULL, timep);
01624 }
01625
01632 EXPORT_MSCPP
01633 char * EXPORT_BORLAND
01634 com_GetDefaultConnectionId()
01635 {
01636 static ICLTerm *info = NULL;
01637 ICLTerm *answers = NULL;
01638 char *result = NULL;
01639
01640 if (!icl_IsValid(info)) {
01641 info = icl_NewTermFromData(
01642 "com_connection_info(Id,Protocol,Type,InfoList,Status)",53);
01643 }
01644
01645
01646 if (!commdb)
01647 return NULL;
01648
01649 if (db_Solve(commdb, info, ICL_EMPTY, &answers)) {
01650 ICLTerm *item = icl_NthTerm(answers, 1);
01651 result = strdup(icl_Str(icl_NthTerm(item, 1)));
01652 }
01653 icl_Free(answers);
01654 return result;
01655 }
01656
01662 EXPORT_MSCPP
01663 void EXPORT_BORLAND
01664 cnx_CanonicalAddress(ICLTerm *address, SOCKET socket, ICLTerm **result)
01665 {
01666 ICLTerm *addressStruct = icl_NewTermFromData("tcp(Host,Port)",14);
01667
01668 if (icl_Unify(addressStruct, address, NULL) && (socket>0)) {
01669 struct sockaddr_in* ptr_distaddr;
01670 struct sockaddr distaddr;
01671 int remotePortNumber = -1;
01672 char remoteHostName[255];
01673
01674 #ifdef _WINDOWS
01675 int addrlen = sizeof(distaddr);
01676 #else
01677 socklen_t addrlen = sizeof(distaddr);
01678 #endif
01679
01680 sprintf(remoteHostName, "unknown");
01681 ptr_distaddr = (struct sockaddr_in *)&(distaddr);
01682
01683 if (getpeername(socket, &distaddr, &addrlen)==0) {
01684
01685
01686
01687
01688
01689
01690
01691
01692
01693 remotePortNumber = icl_Int(icl_NthTerm(address, 2));
01694
01695
01696
01697
01698
01699
01700
01701
01702 sprintf(remoteHostName,"%s",inet_ntoa(ptr_distaddr->sin_addr));
01703
01704
01705 if (result!=NULL)
01706 *result = icl_NewStruct("addr", 1,
01707 icl_NewStruct("tcp", 2,
01708 icl_NewStr(remoteHostName),
01709 icl_NewInt(remotePortNumber)));
01710
01711 }
01712 else {
01713 comPrintError("%s",
01714 "cnx_CanonicalAddress() can only be called on CONNECTED socket!");
01715 }
01716 }
01717 icl_Free(addressStruct);
01718 }
01719
01720
01721
01722
01723
01724 static void checkEventBuffer() {
01725 if (eventBuffer == NULL) {
01726 eventBuffer = icl_NewList(NULL);
01727 }
01728 }
01729
01730 static void com_accept_new_connection(int listenersocket) {
01731 int connection = -1;
01732 char* namestring = NULL;
01733 ICLTerm *readerInfo = NULL;
01734 ICLTerm *senderInfo = NULL;
01735 ICLTerm *ev_conn = NULL;
01736
01737 connection = accept(listenersocket, NULL, NULL);
01738
01739 if (connection > 0) {
01740 ICLTerm *canonicalOtherAddress = NULL;
01741 ICLTerm *newConnectionInfo = NULL;
01742 ICLTerm *tempList = icl_NewList(NULL);
01743 char *ConnectionId = new_direct_client_id();
01744 ICLTerm *templateTcpAddr = icl_NewTermFromData("tcp(Host,Port)",14);
01745 char buf[1000];
01746
01747 memset(buf, 0, sizeof(buf));
01748 cnx_CanonicalAddress(templateTcpAddr, connection, &canonicalOtherAddress);
01749
01750 icl_AddToList(tempList, icl_NewStruct("other_address", 1, canonicalOtherAddress), FALSE);
01751 icl_AddToList(tempList, icl_NewStruct("connection", 1, icl_NewInt(connection)),FALSE);
01752
01753 {
01754 TermReader* realTr = NULL;
01755 TermSender* realTs = NULL;
01756 char* readerPtr;
01757 char* readerInfoBuf;
01758 char* senderPtr;
01759 char* senderInfoBuf;
01760
01761 if (com_is_binary_listener(listenersocket)) {
01762 BinaryTermReader* btr;
01763 BinaryTermSender* bts;
01764 realTr = termReader_create();
01765 btr = binaryTermReader_create(realTr, connection);
01766 realTs = termSender_create();
01767 bts = binaryTermSender_create(realTs, connection);
01768 }
01769 else {
01770 StringTermReader* str;
01771 StringTermSender* sts;
01772 realTr = termReader_create();
01773 str = stringTermReader_create(realTr, connection);
01774 realTs = termSender_create();
01775 sts = stringTermSender_create(realTs, connection);
01776 }
01777
01778 readerPtr = pointerToString(realTr);
01779 #ifdef NORMAL_GC
01780
01781
01782
01783
01784
01785
01786
01787 termReaders[++termReadersIdx] = realTr;
01788 #endif
01789 readerInfoBuf = (char*)malloc(strlen(readerPtr) + 10 + 1);
01790 strncpy(readerInfoBuf, "reader('", 8);
01791 strncpy(readerInfoBuf + 8, readerPtr, strlen(readerPtr));
01792 readerInfoBuf[strlen(readerPtr) + 8] = '\'';
01793 readerInfoBuf[strlen(readerPtr) + 9] = ')';
01794 readerInfoBuf[strlen(readerPtr) + 10] = '\0';
01795 readerInfo = icl_NewTermFromString(readerInfoBuf);
01796 free(readerPtr);
01797 free(readerInfoBuf);
01798 senderPtr = pointerToString(realTs);
01799 #ifdef NORMAL_GC
01800
01801
01802
01803
01804
01805
01806
01807 termSenders[++termSendersIdx] = realTs;
01808 #endif
01809 senderInfoBuf = (char*)malloc(strlen(senderPtr) + 10 + 1);
01810 strncpy(senderInfoBuf, "sender('", 8);
01811 strncpy(senderInfoBuf + 8, senderPtr, strlen(senderPtr));
01812 senderInfoBuf[strlen(senderPtr) + 8] = '\'';
01813 senderInfoBuf[strlen(senderPtr) + 9] = ')';
01814 senderInfoBuf[strlen(senderPtr) + 10] = '\0';
01815 senderInfo = icl_NewTermFromString(senderInfoBuf);
01816 free(senderPtr);
01817 free(senderInfoBuf);
01818 }
01819
01820 icl_AddToList(tempList, readerInfo, FALSE);
01821 icl_AddToList(tempList, senderInfo, FALSE);
01822
01823 newConnectionInfo = icl_NewStruct("com_connection_info", 5,
01824 icl_NewStr(ConnectionId),
01825 icl_NewStr("tcp"),
01826 icl_NewStr("client"),
01827 tempList,
01828 icl_NewStr("connected"));
01829
01830 if (newConnectionInfo) {
01831 db_Assert(commdb, newConnectionInfo, ICL_EMPTY);
01832 icl_Free(newConnectionInfo);
01833 }
01834 else {
01835 printf("Error! %s:%i com_connection_info could not be constructed.\n", __FILE__, __LINE__);
01836 }
01837
01838 {
01839 ICLTerm *ConnEvent = NULL;
01840 ICLTerm *EventTerm = NULL;
01841 ICLTerm *t1 = NULL;
01842
01843
01844 com_select_event_from_socket(connection, 0.0, &EventTerm);
01845
01846 ConnEvent = icl_NthTerm(icl_NthTerm(EventTerm, 1), 1);
01847
01848 if (!icl_Unify(ConnEvent,
01849 (t1 = icl_NewStruct("ev_connect",
01850 1,
01851 icl_NewVar("_"))),
01852 NULL)) {
01853 fprintf(stderr, "Warning in com_accept_new_connection: ");
01854 fprintf(stderr, "Expected ev_connect, got %s\n", icl_Functor(ConnEvent));
01855 fflush(stderr);
01856 }
01857 icl_Free(t1);
01858 icl_Free(EventTerm);
01859 }
01860
01861 namestring = oaa_name_string();
01862 sprintf(buf, "event(ev_connected([other_name('%s'), "
01863 "other_language(c), other_type(client), "
01864 "other_version(%s), ascii(%d), binary(%d)]),[])",
01865 namestring, oaa_library_version_str,
01866 stringPort, binaryPort);
01867 icl_stFree(namestring);
01868
01869 ev_conn = icl_NewTermFromString(buf);
01870 com_SendTerm(ConnectionId, ev_conn);
01871 icl_Free(ev_conn);
01872 icl_Free(templateTcpAddr);
01873 free(ConnectionId);
01874 }
01875
01876 }
01877
01878 static int com_add_event_to_buffer(ICLTerm *event) {
01879 int answer = FALSE;
01880 checkEventBuffer();
01881
01882 answer = icl_AddToList(eventBuffer, event, TRUE);
01883 return answer;
01884 }
01885
01886 static int com_disconnect_all_connection_ids() {
01887 int answer = TRUE;
01888 ICLTerm *cids = NULL;
01889 ICLListType *cidslist = NULL;
01890 com_find_all_connection_ids(&cids);
01891 cidslist = icl_List(cids);
01892 while (icl_ListHasMoreElements(cidslist)) {
01893 char *cid = icl_Str(icl_ListElement(cidslist));
01894 if (cid != NULL) {
01895 answer = com_Disconnect(cid) && answer;
01896 }
01897 cidslist = icl_ListNextElement(cidslist);
01898 }
01899 icl_Free(cids);
01900 return answer;
01901 }
01902
01903
01904 static int com_find_all_connected_sockets(ICLTerm *ids, ICLTerm **sockets) {
01905 int numberOfSockets = 0;
01906 ICLListType *idlist = icl_List(ids);
01907 *sockets = icl_NewList(NULL);
01908 while(icl_ListHasMoreElements(idlist)) {
01909 char *id = icl_Str(icl_ListElement(idlist));
01910 int socket = com_get_socket_from_connection_id(id);
01911 if (socket > 0) {
01912 icl_AddToList(*sockets, icl_NewInt(socket), TRUE);
01913 numberOfSockets++;
01914 }
01915 idlist = icl_ListNextElement(idlist);
01916 }
01917 return numberOfSockets;
01918 }
01919
01920
01921 static int com_find_all_connection_ids(ICLTerm **ids) {
01922 int answer = FALSE;
01923 ICLTerm *result = NULL;
01924 ICLListType *resultlist;
01925 static ICLTerm *info = NULL;
01926
01927 if (!icl_IsValid(info)) {
01928 info = icl_NewTermFromData(
01929 "com_connection_info(ConnectionId,Protocol,Type,InfoList,Status)",63);
01930 }
01931
01932 if ( (answer = db_Solve(commdb, info, ICL_EMPTY, &result)) ) {
01933 resultlist = icl_List(result);
01934 *ids = icl_NewList(NULL);
01935 while(icl_ListHasMoreElements(resultlist)) {
01936 ICLTerm *item = icl_ListElement(resultlist);
01937 ICLTerm *id = icl_NthTerm(item,1);
01938 icl_AddToList(*ids, icl_CopyTerm(id), FALSE);
01939 resultlist = icl_ListNextElement(resultlist);
01940 }
01941 }
01942 icl_Free(result);
01943 return answer;
01944 }
01945
01946 static char* com_get_connection_id_from_socket(int socket) {
01947 char *answer = NULL;
01948 ICLTerm *cids = NULL;
01949 ICLListType *cidslist = NULL;
01950 com_find_all_connection_ids(&cids);
01951 cidslist = icl_List(cids);
01952 while (icl_ListHasMoreElements(cidslist)) {
01953 char *cid = icl_Str(icl_ListElement(cidslist));
01954 if (socket == com_get_socket_from_connection_id(cid)) {
01955 answer = strdup(cid);
01956 break;
01957 }
01958 cidslist = icl_ListNextElement(cidslist);
01959 }
01960 icl_Free(cids);
01961 return answer;
01962 }
01963
01967 EXPORT_MSCPP
01968 int EXPORT_BORLAND
01969 com_GetEventFromConnection(char *ConnectionId, double timeout, ICLTerm **event) {
01970 int socketNumber = com_get_socket_from_connection_id(ConnectionId);
01971 return com_select_event_from_socket(socketNumber, timeout, event);
01972 }
01973
01981 static char *com_get_localhost_ip_address(void) {
01982 struct hostent *h = NULL;
01983 char name[MAXHOSTNAMELEN + 1];
01984
01985 if ( (gethostname(name, sizeof(name)) != 0) ||
01986 ((h = gethostbyname(name)) == NULL) ) {
01987 fprintf(stderr,"Warning: could not determine address for localhost\n");
01988 return NULL;
01989 }
01990 return strdup(inet_ntoa(*((struct in_addr *) *(h->h_addr_list))));
01991 }
01992
01993 static int com_is_binary_listener(int socket) {
01994 return (socket == binaryListener);
01995 }
01996
01997 static int com_is_listening() {
01998 return stringListener > 0;
01999 }
02000
02001
02002
02009 static int com_read_events_into_buffer(double timeout) {
02010 int numberOfEventsAdded = 0;
02011 ICLTerm *ids = NULL;
02012 ICLTerm *connectedSockets = NULL;
02013 ICLListType *connectedSocketList = NULL;
02014
02015 struct timeval time;
02016 struct timeval* timep = NULL;
02017 int width = 0;
02018 fd_set read_set;
02019
02020
02021 if(timeout == 0 || timeout > 1) {
02022 time.tv_sec = 1;
02023 time.tv_usec = 0;
02024 timep = &time;
02025 }
02026 else if(timeout > 0) {
02027 long i = (long)floor(timeout);
02028 time.tv_sec = i;
02029 time.tv_usec = (long)floor((timeout - i) * 1e6);
02030 timep = &time;
02031 }
02032
02033 FD_ZERO(&read_set);
02034
02035
02036 if (com_is_listening()) {
02037 FD_SET(stringListener, &read_set);
02038 FD_SET(binaryListener, &read_set);
02039 }
02040
02041 com_find_all_connection_ids(&ids);
02042 com_find_all_connected_sockets(ids, &connectedSockets);
02043 connectedSocketList = icl_List(connectedSockets);
02044
02045
02046 while(icl_ListHasMoreElements(connectedSocketList)) {
02047 int socket = icl_Int(icl_ListElement(connectedSocketList));
02048 FD_SET(socket, &read_set);
02049 width = MAX(width, socket+1);
02050 connectedSocketList = icl_ListNextElement(connectedSocketList);
02051 }
02052
02053 if (select(width, &read_set, NULL, NULL, timep) <= 0) {
02054
02055 icl_Free(ids);
02056 icl_Free(connectedSockets);
02057 return 0;
02058 }
02059
02060 if (com_is_listening()) {
02061
02062 if (FD_ISSET(stringListener, &read_set)) {
02063 com_accept_new_connection(stringListener);
02064 FD_CLR(stringListener, &read_set);
02065 }
02066
02067 if (FD_ISSET(binaryListener, &read_set)) {
02068 com_accept_new_connection(binaryListener);
02069 FD_CLR(stringListener, &read_set);
02070 }
02071 }
02072
02073
02074 connectedSocketList = icl_List(connectedSockets);
02075 while(icl_ListHasMoreElements(connectedSocketList)) {
02076 int socket = icl_Int(icl_ListElement(connectedSocketList));
02077
02078
02079 if (FD_ISSET(socket, &read_set)) {
02080 ICLTerm *event = NULL;
02081 double totalTime = 0;
02082 while(((timeout == 0) || (totalTime < timeout)) &&
02083 com_select_event_from_socket(socket, 0.01, &event)) {
02084 com_add_event_to_buffer(event);
02085 ++numberOfEventsAdded;
02086 totalTime += 0.01;
02087 if(icl_IsStruct(event) &&
02088 (icl_NumTerms(event) == 1) &&
02089 (strcmp(icl_Functor(event), "event") == 0)) {
02090 ICLTerm* firstArg = icl_NthTerm(event, 1);
02091 if(icl_IsStr(firstArg) &&
02092 (strcmp(icl_Str(firstArg), "timeout") == 0)) {
02093 break;
02094 }
02095 }
02096 }
02097 }
02098 connectedSocketList = icl_ListNextElement(connectedSocketList);
02099 }
02100
02101 icl_Free(ids);
02102 icl_Free(connectedSockets);
02103 return numberOfEventsAdded;
02104 }
02105
02106 static int com_select_event_from_buffer(ICLTerm **event) {
02107 ICLListType *eventList = NULL;
02108
02109
02110 checkEventBuffer();
02111 if (icl_ListLen(eventBuffer) == 0 ) {
02112 *event = NULL;
02113 return FALSE;
02114 }
02115
02116
02117 eventList = icl_List(eventBuffer);
02118 *event = eventList->elt;
02119 eventBuffer->p = eventList->next;
02120 #ifdef NORMAL_GC
02121 memset(eventList, 0, sizeof(ICLListType));
02122 #endif
02123 free(eventList);
02124
02125 return TRUE;
02126 }
02127
02132 static int com_select_event_from_socket(int socketNumber, double timeout, ICLTerm **event){
02133 ICLTerm* readerTerm;
02134 ICLTerm* query;
02135 TermReader* reader;
02136 char* readerString;
02137 char *inConnectionId;
02138 inConnectionId = com_get_connection_id_from_socket(socketNumber);
02139 query = icl_NewStruct("reader", 1, icl_NewVar("_"));
02140 if(!com_GetInfo(inConnectionId, query, &readerTerm)) {
02141 icl_Free(query);
02142 fprintf(stderr, "com_select_event_from_socket could not find reader\n");
02143 icl_stFree(inConnectionId);
02144 return FALSE;
02145 }
02146 readerString = icl_Str(icl_NthTerm(readerTerm, 1));
02147 readerString = strdup(readerString);
02148 icl_stRemoveQuotes(readerString);
02149 reader = (TermReader*)stringToPointer(readerString);
02150 free(readerString);
02151 CHECK_LEAKS();
02152 *event = termReader_getNextTerm(reader, timeout);
02153 CHECK_LEAKS();
02154 icl_Free(query);
02155 icl_Free(readerTerm);
02156 if(*event == NULL) {
02157 int te = termReader_getError(reader);
02158
02159 if ( (te != TERMREADER_OKAY) &&
02160 (te != TERMREADER_TIMEOUT) &&
02161 (te != TERMREADER_READERR) &&
02162 (te != TERMREADER_NOCONN) ) {
02163 fprintf(stderr,
02164 "com_select_event_from_socket error in reading term: %i\n", te);
02165 }
02166
02167 if ( (te != TERMREADER_OKAY) &&
02168 (te != TERMREADER_TIMEOUT) ) {
02169 com_Disconnect(inConnectionId);
02170
02171
02172 }
02173 icl_stFree(inConnectionId);
02174 return FALSE;
02175 }
02176 else {
02177
02178 if (!strcmp(icl_Functor(*event),"term")) {
02179 ICLTerm *CID = icl_NewStruct("connection_id",1,icl_NewStr(inConnectionId));
02180 if (icl_NumTerms(*event) == 1) {
02181 icl_AddToList(icl_NthTerm(icl_NthTerm(*event,1),2), CID, TRUE);
02182 }
02183 else if (icl_NumTerms(*event) == 2) {
02184 icl_AddToList(icl_NthTerm(icl_NthTerm(*event,2),2), CID, TRUE);
02185 }
02186 }
02187 icl_stFree(inConnectionId);
02188 return TRUE;
02189 }
02190 }
02191
02192 static ICLTerm* initiatedTerm = NULL;
02196 static void sendHeartbeats()
02197 {
02198 ICLTerm* initiatedConnections = NULL;
02199 ICLListType* listElem;
02200
02201 if(!initiatedTerm) {
02202 initiatedTerm = icl_NewStruct("initiated", 1, icl_NewStr("true"));
02203 }
02204
02205 com_getMatchingInfos(initiatedTerm, &initiatedConnections);
02206
02207 for(listElem = icl_List(initiatedConnections); listElem; listElem = icl_ListNext(listElem)) {
02208 char* connectionId = icl_Str(icl_NthTerm(icl_ListElt(listElem), 1));
02209 if(oaa_SupportsSequenceNumbers(connectionId)) {
02210 ICLTerm* heartbeatEvent = icl_NewStruct("event", 2, icl_NewStr("ev_heartbeat"), icl_NewList(NULL));
02211 com_SendTerm(connectionId, heartbeatEvent);
02212 icl_Free(heartbeatEvent);
02213 }
02214 }
02215
02216 icl_Free(initiatedConnections);
02217 }
02218
02222 static void checkForFailedHeartbeats()
02223 {
02224 ICLTerm* initiatedConnections = NULL;
02225 ICLListType* listElem;
02226 GTimeVal now;
02227 glong nowSec;
02228
02229 g_get_current_time(&now);
02230 nowSec = now.tv_sec;
02231
02232 com_getMatchingInfos(initiatedTerm, &initiatedConnections);
02233
02234 for(listElem = icl_List(initiatedConnections); listElem; listElem = icl_ListNext(listElem)) {
02235 ICLTerm* lastHeartbeat = NULL;
02236 ICLTerm* infoList = icl_NthTerm(icl_ListElt(listElem), 4);
02237 if(!icl_ParamValue("last_heartbeat", NULL, infoList, &lastHeartbeat)) {
02238 continue;
02239 }
02240 else {
02241 char* connectionId = icl_Str(icl_NthTerm(icl_ListElt(listElem), 1));
02242 glong prevHeartbeatSec = icl_Int(icl_NthTerm(lastHeartbeat, 1));
02243 if(nowSec - prevHeartbeatSec > HEARTBEAT_TIMEOUT) {
02244 ICLTerm* socketTerm;
02245 int socket;
02246 if(!icl_ParamValue("connection", NULL, infoList, &socketTerm)) {
02247 fprintf(stderr, "Unable to determine socket for failed connection %s\n", connectionId);
02248 return;
02249 }
02250 if(!(socket = icl_Int(icl_NthTerm(socketTerm, 1)))) {
02251 char* socketTermString = icl_NewStringFromTerm(socketTerm);
02252 fprintf(stderr, "Unable to determine socket from socket term %s\n", socketTermString);
02253 icl_stFree(socketTermString);
02254 return;
02255 }
02256 close_socket(socket);
02257 {
02258 ICLTerm* senderQuery = icl_NewStruct("sender", 1, icl_NewVar("_"));
02259 ICLTerm* senderTerm = NULL;
02260 if(!com_GetInfo(connectionId, senderQuery, &senderTerm)) {
02261 fprintf(stderr, "Unable to find sender pointer for connection %s\n", connectionId);
02262 }
02263 if(senderTerm) {
02264 char *pointerStr = strdup(icl_Str(icl_NthTerm(senderTerm, 1)));
02265 TermSender* ts;
02266 icl_stRemoveQuotes(pointerStr);
02267 ts = (TermSender*)stringToPointer(pointerStr);
02268 free(ts);
02269 free(pointerStr);
02270 com_RemoveInfo(connectionId, senderQuery);
02271 }
02272 icl_Free(senderQuery);
02273 }
02274 {
02275 ICLTerm* readerQuery = icl_NewStruct("reader", 1, icl_NewVar("_"));
02276 ICLTerm* readerTerm = NULL;
02277 if(!com_GetInfo(connectionId, readerQuery, &readerTerm)) {
02278 fprintf(stderr, "Unable to find reader pointer for connection %s\n", connectionId);
02279 }
02280 if(readerTerm) {
02281 char *pointerStr = strdup(icl_Str(icl_NthTerm(readerTerm, 1)));
02282 TermSender* ts;
02283 icl_stRemoveQuotes(pointerStr);
02284 ts = (TermSender*)stringToPointer(pointerStr);
02285 free(ts);
02286 free(pointerStr);
02287 com_RemoveInfo(connectionId, readerQuery);
02288 }
02289 icl_Free(readerQuery);
02290 }
02291 com_RemoveInfo(connectionId, lastHeartbeat);
02292 {
02293 ICLTerm* reconnectTerm = icl_NewStruct("needs_reconnect", 1, icl_NewStr("true"));
02294 com_UpdateInfo(connectionId, reconnectTerm);
02295 }
02296 printf("Failed connection %s\n", connectionId);
02297 }
02298 icl_Free(lastHeartbeat);
02299 }
02300 }
02301 if(initiatedConnections) icl_Free(initiatedConnections);
02302 }
02303
02304 static void tryReconnects()
02305 {
02306 ICLTerm* reconnectConnections = NULL;
02307
02308 ICLTerm* reconnectTerm = icl_NewStruct("needs_reconnect", 1, icl_NewStr("true"));
02309 ICLTerm* otherAddressMatcher = icl_NewStruct("other_address", 1, icl_NewVar("_"));
02310 ICLTerm* myAddressMatcher = icl_NewStruct("oaa_address", 1, icl_NewVar("_"));
02311 ICLTerm* otherSequenceMatcher = icl_NewStruct("other_sequence", 1, icl_NewVar("_"));
02312 ICLListType* listElem;
02313 ICLTerm* myNameMatcher = icl_NewStruct("oaa_name", 1, icl_NewVar("_"));
02314
02315 com_getMatchingInfos(reconnectTerm, &reconnectConnections);
02316 for(listElem = icl_List(reconnectConnections); listElem; listElem = icl_ListNext(listElem)) {
02317 char* connId = icl_Str(icl_NthTerm(icl_ListElt(listElem), 1));
02318 ICLTerm* otherAddress;
02319 ICLTerm* connectAddress;
02320 ICLTerm* myAddress;
02321 ICLTerm* otherSequence;
02322 ICLTerm* myName;
02323 int lastSeen;
02324
02325 com_RemoveInfo(connId, reconnectTerm);
02326 if(!icl_Member(otherAddressMatcher, icl_NthTerm(icl_ListElt(listElem), 4), &otherAddress)) {
02327 fprintf(stderr, "reconnect required for connection %s, but no other_address found\n", connId);
02328 goto cleanup;
02329 }
02330 connectAddress = icl_CopyTerm(icl_NthTerm(icl_NthTerm(otherAddress, 1), 1));
02331 if(!com_GetInfo(connId, myAddressMatcher, &myAddress)) {
02332 fprintf(stderr, "reconnect required for connection %s, but no oaa_address found\n", connId);
02333 goto cleanup;
02334 }
02335 if(!com_GetInfo(connId, myNameMatcher, &myName)) {
02336 fprintf(stderr, "reconnect required for connection %s, but no oaa_name found\n", connId);
02337 goto cleanup;
02338 }
02339 if(!com_GetInfo(connId, otherSequenceMatcher, &otherSequence)) {
02340 lastSeen = -1;
02341 }
02342 else {
02343 lastSeen = icl_Int(icl_NthTerm(otherSequence, 1));
02344 icl_Free(otherSequence);
02345 }
02346 if(com_Connect(connId, connectAddress)) {
02347 com_add_event_to_buffer(
02348 icl_NewStruct("term", 1,
02349 icl_NewStruct("event", 2,
02350 icl_NewStruct("ev_reconnected_needs_handshake", 5,
02351 icl_NewStr(connId),
02352 icl_NewStr(icl_Str(icl_NthTerm(myName, 1))),
02353 icl_NewStruct("reconnect", 1, icl_NewInt(lastSeen)),
02354 icl_NewStruct("other_address", 1, icl_CopyTerm(icl_NthTerm(myAddress, 1))),
02355 icl_NewStruct("other_name", 1, icl_NewStr(icl_Str(icl_NthTerm(myName, 1))))
02356 ),
02357 icl_NewList(NULL))));
02358 }
02359
02360 icl_Free(connectAddress);
02361 }
02362
02363 cleanup:
02364 icl_Free(reconnectConnections);
02365 icl_Free(otherAddressMatcher);
02366 icl_Free(myAddressMatcher);
02367 icl_Free(otherSequenceMatcher);
02368 icl_Free(myNameMatcher);
02369 icl_Free(reconnectTerm);
02370 }
02371
02372 int initializedLastFired = FALSE;
02373 static GTimeVal lastFired;
02374
02378 static void fireTimers()
02379 {
02380 if(!initializedLastFired) {
02381 g_get_current_time(&lastFired);
02382 initializedLastFired = TRUE;
02383 }
02384 else {
02385 GTimeVal now;
02386 guint64 udiff;
02387 g_get_current_time(&now);
02388 udiff = (now.tv_sec - lastFired.tv_sec) * 1000000 + (now.tv_usec - lastFired.tv_usec);
02389 if(udiff > 1000000) {
02390 lastFired.tv_sec = now.tv_sec;
02391 lastFired.tv_usec = now.tv_usec;
02392 }
02393 else {
02394 return;
02395 }
02396 }
02397
02398 sendHeartbeats();
02399 checkForFailedHeartbeats();
02400 tryReconnects();
02401 }
02402
02412 EXPORT_MSCPP
02413 int EXPORT_BORLAND
02414 com_SelectEventFromAllIds(double timeout, ICLTerm **event) {
02415 int answer = FALSE;
02416 checkEventBuffer();
02417
02418 if (com_select_event_from_buffer(event)) {
02419 CHECK_LEAKS();
02420 answer = TRUE;
02421 }
02422 else if (com_read_events_into_buffer(timeout)) {
02423 CHECK_LEAKS();
02424 answer = com_select_event_from_buffer(event);
02425 CHECK_LEAKS();
02426 }
02427 else {
02428 answer = FALSE;
02429 }
02430 fireTimers();
02431 CHECK_LEAKS();
02432 return answer;
02433 }
02434
02438 static char* new_direct_client_id() {
02439 static int counter = 1;
02440 char temp[255];
02441 sprintf(temp, "direct_client%d", counter++);
02442 return strdup(temp);
02443 }
02444
02445
02451 static int opentcpport(int port) {
02452 int socketFd;
02453 struct sockaddr_in addr;
02454
02455
02456 socketFd = socket(AF_INET, SOCK_STREAM, 0);
02457 if (socketFd < 0) {
02458 printf("opentcpport: socket open failure: %m\n");
02459 return -1;
02460 }
02461
02462
02463 memset(&addr, 0, sizeof(addr));
02464
02465
02466 addr.sin_family = AF_INET;
02467
02468
02469 addr.sin_addr.s_addr = htonl(INADDR_ANY);
02470
02471
02472 addr.sin_port = htons(port);
02473
02474
02475 if (bind(socketFd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
02476 printf("opentcpport: bind failure on port %d: %m\n", port);
02477 return -1;
02478 }
02479
02480
02481 return socketFd;
02482 }
02483
02487 static int tcpListenAtPort(int port) {
02488 char listenAtString[600];
02489 ICLTerm *listenAtTerm;
02490 int socketFd;
02491
02492
02493 if (!commdb)
02494 commdb = db_NewDB();
02495
02496 socketFd = opentcpport(port);
02497
02498 if (listen(socketFd, 5) < 0)
02499 {
02500 printf("listen() error: %m\n");
02501 }
02502
02503 if (socketFd >= 0) {
02504 sprintf(listenAtString, "tcp_listener(%d)", socketFd);
02505 }
02506 listenAtTerm = icl_NewTermFromString(listenAtString);
02507
02508 if (listenAtTerm) {
02509 db_Assert(commdb, listenAtTerm, ICL_EMPTY);
02510 icl_Free(listenAtTerm);
02511 }
02512 else {
02513 printf("Error! tcp_listener could not be constructed.\n");
02514 }
02515
02516 return socketFd;
02517 }
02518
02519