libcom_tcp.c

Go to the documentation of this file.
00001 /******************************************************************************
00002 %   File    : libcom_tcp.c
00003 %   Primary Authors  : Adam Cheyer, Didier Guzzoni
00004 %   Purpose : TCP instantiation of lowlevel communication primitives for OAA
00005 %         Works for both Microsoft Windows (1st half) and UNIX (2nd half)
00006 %   Updated : 01/98
00007 %
00008 %*/
00009 /*
00010  * Copyright (C) 2006  SRI International
00011  *
00012  * This library is free software; you can redistribute it and/or
00013  * modify it under the terms of the GNU Lesser General Public
00014  * License as published by the Free Software Foundation; either
00015  * version 2.1 of the License, or (at your option) any later version.
00016  *
00017  * This library is distributed in the hope that it will be useful,
00018  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00019  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00020  * Lesser General Public License for more details.
00021  *
00022  * You should have received a copy of the GNU Lesser General Public
00023  * License along with this library; if not, write to the Free Software
00024  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
00025  *
00026  * SRI International: 333 Ravenswood Ave, Menlo Park, CA 94025
00027  */
00028 
00029 #define EXPORT_BORLAND
00030 
00031 #ifdef IS_DLL
00032 #define EXPORT_MSCPP __declspec(dllexport)
00033 #else
00034 #define EXPORT_MSCPP
00035 #endif
00036 
00037 #define OAA_BINARY_PORT_OFFSET 10
00038 
00039 /****************************************************************************
00040  * Include files
00041  ****************************************************************************/
00042 #ifndef _WINDOWS
00043 #include <sys/param.h>  // used by com_get_localhost_ip_address()
00044 #endif
00045 #include <stdio.h>
00046 #include <stdarg.h>   /* Variable length argument lists */
00047 #include <stdlib.h>
00048 #include <string.h>
00049 #include <errno.h>
00050 #include <math.h>
00051 #include "libcom_tcp.h"
00052 #include "libdb.h"
00053 #include "libutils.h"
00054 #include "libicl_private.h"
00055 #include "libicl.h"
00056 #include "libicl_depr.h"
00057 #include "termreader.h"
00058 #include "stringtermreader.h"
00059 #include "binarytermreader.h"
00060 #include "termsender.h"
00061 #include "stringtermsender.h"
00062 #include "binarytermsender.h"
00063 #include "liboaa.h"
00064 
00065 extern const char* oaa_library_version_str;
00066 
00067 /****************************************************************************
00068  * Global variables and forward declarations
00069  ****************************************************************************/
00070 #ifndef STREQ
00071 #define STREQ(str1, str2) (strcmp((str1), (str2)) == 0)
00072 #endif
00073 
00074 int COM_BEST_FORMAT = 100;
00075 int COM_STRING_FORMAT = 1;
00076 int COM_BINARY_FORMAT = 2;
00077 
00078 #ifdef NORMAL_GC
00079 static void* termReaders[128];
00080 static void* termSenders[128];
00081 static int termReadersIdx = -1;
00082 static int termSendersIdx = -1;
00083 /*
00084 static GPtrArray* termReaders = NULL;
00085 static GPtrArray* termSenders = NULL;
00086 */
00087 #endif
00088 
00089 /* Communication database */
00090 ICLDatabase *commdb = NULL;
00091 
00097 static ICLTerm *eventBuffer = NULL;
00098 
00104 static int stringListener = -1;
00105 
00111 static int binaryListener = -1;
00112 
00118 static int stringPort = -1;
00119 
00125 static int binaryPort = -1;
00126 
00127 static int nextStartingSeqNum = 0;
00128 
00129 static const int HEARTBEAT_TIMEOUT = 10;
00130 
00131 /* Forward definitions */
00132 int com_ask_about_tcp_exception(int Port, char *Host);
00133 void    tcpShutdown(SOCKET connection);
00134 SOCKET  tcpConnect(char *hostName, int port);
00135 // Timeout given in seconds
00136 int tcpSelect(SOCKET socket, double timeout);
00137 EXTERN void comPrintError(char *str, ...);
00138 
00139 // new functions for direct_connect
00140 static void checkEventBuffer(void);
00141 static void com_accept_new_connection(int listenersocket);
00142 static int com_add_event_to_buffer(ICLTerm *event);
00143 static int com_disconnect_all_connection_ids();
00144 static int com_find_all_connected_sockets(ICLTerm *ids, ICLTerm **sockets);
00145 static int com_find_all_connection_ids(ICLTerm **ids);
00146 static char *com_get_connection_id_from_socket(int socket);
00147 static char *com_get_localhost_ip_address(void);
00148 static int com_is_binary_listener(int socket);
00149 static int com_is_listening(void);
00150 static int com_read_events_into_buffer(double timeout);
00151 static int com_select_event_from_buffer(ICLTerm **event);
00152 static int com_select_event_from_socket(int socketNumber, double timeout, ICLTerm **event);
00153 static char* new_direct_client_id(void);
00154 static int opentcpport(int port);
00155 static int tcpListenAtPort(int port);
00156 static struct hostent* getHostEntry(char const* hostName);
00157 
00158 static int close_socket(int socket);
00159 
00160 #ifdef _WINDOWS
00161 /****************************************************************************
00162  * Definitions for MICROSOFT WINDOWS (Unix follows)
00163  ****************************************************************************/
00164 #include <windows.h>
00165 #include <winsock.h>
00166 int tcp_started = 0;
00167 int EXPORT_MSCPP tcpStartup();
00168 #include "oaa-windows.h"
00169 #else
00170 /****************************************************************************
00171  * Definitions for UNIX (Windows above)
00172  ****************************************************************************/
00173 #include <stdio.h>
00174 
00175 #include <sys/types.h>
00176 #include <sys/socket.h>
00177 #include <netinet/in.h>
00178 #include <arpa/inet.h>  /* inet_addr        */
00179 #include <netdb.h>
00180 #include <strings.h>    /* bzero, bcopy     */
00181 #include <unistd.h>     /* close        */
00182 #include <sys/time.h>   /* For polling time */
00183 #include <string.h>
00184 
00185 #endif
00186 
00187 static int close_socket(int socket)
00188 {
00189 #ifdef _WINDOWS
00190   return closesocket(socket);
00191 #else
00192   return close(socket);
00193 #endif
00194 }
00195 
00196 static int getNextStartingSeqnum()
00197 {
00198   int result;
00199   if(nextStartingSeqNum == 0) {
00200     srand(1023);
00201   }
00202   result = rand();
00203   return (result < 0 ? -result : result) | 2;
00204 }
00205 
00217 EXPORT_MSCPP
00218 SOCKET EXPORT_BORLAND
00219 com_Connect(char *ConnectionId, ICLTerm *Address)
00220 {
00221   return comConnectFormat(ConnectionId, ICL_EMPTY, Address, COM_BEST_FORMAT);
00222 }
00223 
00224 char const* commonStart = "event(ev_connect([";
00225 char const* commonEnd = "query_formats(_),other_name('libcom_tcp_formatRequest_shouldDisconnect'),other_type(client)]),[])";
00226 char const* otherVersionStart = "other_version(";
00227 char const* otherVersionEnd = ")";
00228 char const* passwordStart = "password(";
00229 char const* passwordEnd = ")";
00230 char const* comma = ",";
00231 
00232 static ICLTerm* createEvConnectEvent(char const* password, ICLTerm* hostTerm)
00233 {
00234   char* tempString;
00235   char* host = icl_NewStringFromTerm(hostTerm);
00236   ICLTerm* result;
00237   int tempStringLen = 
00238     strlen(commonStart) + 
00239     strlen(otherVersionStart) + strlen(oaa_library_version_str) + strlen(otherVersionEnd) + strlen(comma) +
00240     (password ? strlen(passwordStart) + strlen(password) + strlen(passwordEnd) + strlen(comma) : 0) +
00241     strlen(commonEnd) + 1;
00242 
00243   tempString = (char*)malloc(tempStringLen);
00244   snprintf(tempString, tempStringLen, "%s%s%s%s%s%s%s%s%s%s", 
00245            commonStart, 
00246            otherVersionStart, oaa_library_version_str, otherVersionEnd, comma,
00247            (password ? passwordStart : ""), (password ? password : ""), (password ? passwordEnd : ""), (password ? comma : ""),
00248            commonEnd);
00249   result = icl_NewTermFromString(tempString);
00250   free(tempString);
00251   icl_stFree(host);
00252   return result;
00253 }
00254 
00255 static void com_set_connect_info(char* connectionId, ICLTerm* params, ICLTerm* host, ICLTerm* port, ICLTerm* readerInfo, ICLTerm* senderInfo, SOCKET connection)
00256 {
00257   ICLTerm *canonicalOtherAddress = NULL;
00258   ICLTerm *newConnectionInfo = NULL;
00259   ICLTerm *tempList = icl_NewList(NULL);
00260   ICLTerm *atmp;
00261   ICLTerm *templateTcpAddr = icl_NewStruct("tcp", 2, icl_CopyTerm(host), icl_CopyTerm(port));
00262   ICLTerm *typeMatcher = icl_NewStruct("type", 1, icl_NewVar("_"));
00263 
00264   icl_AddToList(tempList, readerInfo, FALSE);
00265   icl_AddToList(tempList, senderInfo, FALSE);
00266 
00267 
00268   if(!com_GetInfo(connectionId, typeMatcher, NULL)) {
00269     int startingSequenceNumber = getNextStartingSeqnum();
00270     ICLTerm* cHost;
00271     ICLTerm* cPort;
00272     icl_AddToList(tempList, icl_NewStruct("connection", 1, icl_NewInt(connection)),FALSE);
00273     icl_AddToList(tempList, icl_NewStruct("initiated", 1, icl_NewStr("true")), FALSE);
00274     icl_AddToList(tempList, icl_NewStruct("sequence", 1, icl_NewInt(startingSequenceNumber)), FALSE);
00275     icl_AddToList(tempList, icl_NewStruct("other_acked", 1, icl_NewInt(startingSequenceNumber - 1)), FALSE);
00276 
00277     // params added to call for direct_connect so that we
00278     // could get the "correct" other_address into the com_connection_info...
00279     {
00280       ICLTerm *tmp1;
00281       ICLTerm *tmp2;
00282       tmp1 = icl_NewStruct("other_address", 1, icl_NewVar("_"));
00283       if (icl_GetParamValue(tmp1, params, &tmp2)) {
00284         icl_AddToList(tempList, tmp2, TRUE);
00285       }
00286       else {
00287         cnx_CanonicalAddress(templateTcpAddr, connection, &canonicalOtherAddress);
00288         icl_AddToList(tempList, icl_NewStruct("other_address", 1, canonicalOtherAddress), FALSE);
00289       }
00290       icl_Free(tmp1);
00291     }
00292 
00293     cHost = icl_CopyTerm(host);
00294     cPort = icl_CopyTerm(port);
00295     atmp = icl_NewStruct("tcp", 2, cHost, cPort);
00296 
00297     newConnectionInfo = icl_NewStruct("com_connection_info", 5,
00298         icl_NewStr(connectionId),
00299         atmp, /*
00300           icl_NewStruct("tcp", 2, cHost, cPort),*/
00301         icl_NewStr("client"),
00302         tempList,
00303         icl_NewStr("connected"));
00304 
00305     {
00306       char* format = get64BitFormatWrapped("libcom_tcp com_Connect() Connected to %s ", ".");
00307       printf(format, icl_Str(host), icl_Int(port));
00308       printf("\n");
00309       free(format);
00310     }
00311 
00312     if (newConnectionInfo) {
00313       db_Assert(commdb, newConnectionInfo, ICL_EMPTY);
00314       /* DEBUG */
00315       /* db_PrintDB(commdb);*/
00316       icl_Free(newConnectionInfo);
00317     }
00318     else {
00319       fprintf(stderr, "Error! %s:%i com_connection_info could not be constructed.\n", __FILE__, __LINE__);
00320     }
00321   }
00322   else {
00323     icl_AddToList(tempList, icl_NewStruct("connection", 1, icl_NewInt(connection)),FALSE);
00324 
00325     // params added to call for direct_connect so that we
00326     // could get the "correct" other_address into the com_connection_info...
00327     {
00328       ICLTerm *tmp1;
00329       ICLTerm *tmp2;
00330       tmp1 = icl_NewStruct("other_address", 1, icl_NewVar("_"));
00331       if (icl_GetParamValue(tmp1, params, &tmp2)) {
00332         icl_AddToList(tempList, tmp2, TRUE);
00333       }
00334       else {
00335         cnx_CanonicalAddress(templateTcpAddr, connection, &canonicalOtherAddress);
00336         icl_AddToList(tempList, icl_NewStruct("other_address", 1, canonicalOtherAddress), FALSE);
00337       }
00338       icl_Free(tmp1);
00339     }
00340 
00341     com_UpdateInfo(connectionId, icl_NewStruct("protocol", 1, icl_NewStruct("tcp", 2, icl_CopyTerm(host), icl_CopyTerm(port))));
00342     com_UpdateInfo(connectionId, icl_NewStruct("type", 1, icl_NewStr("client")));
00343     com_UpdateInfo(connectionId, icl_NewStruct("status", 1, icl_NewStr("connected")));
00344     com_UpdateInfo(connectionId, tempList);
00345     {
00346       char* format = get64BitFormatWrapped("libcom_tcp com_Connect() Connected to %s ", ".");
00347       printf(format, icl_Str(host), icl_Int(port));
00348       printf("\n");
00349       free(format);
00350     }
00351   }
00352   icl_Free(templateTcpAddr);
00353   icl_Free(typeMatcher);
00354 }
00355 
00356 EXPORT_MSCPP
00357 SOCKET EXPORT_BORLAND
00358 comConnectFormat(char* ConnectionId, ICLTerm* Params,
00359                  ICLTerm* Address, int format)
00360 {
00361   ICLTerm *connectionInfo = NULL, *host = NULL, *port = NULL;
00362   ICLTerm *t1;
00363   SOCKET connection = 0;
00364   SOCKET oldConnection = 0;
00365   ICLTerm *readerInfo = NULL;
00366   ICLTerm *senderInfo = NULL;
00367   int res;
00368   ICLTerm* usePassword = NULL;
00369   ICLTerm* passwordTerm = NULL;
00370   char* password = NULL;
00371 
00372 #ifdef _WINDOWS
00373   if (!tcp_started)
00374     tcp_started = tcpStartup();
00375 #endif
00376 
00377   /* DEBUG */
00378   /* printf("Address %s\n", icl_NewStringFromTerm(Address)); */
00379 
00380   /* Make sure database is initialized */
00381   if (!commdb) {
00382     commdb = db_NewDB();
00383   }
00384 
00385   t1 = icl_NewStruct("tcp", 2, icl_NewVar("Host"), icl_NewVar("Port"));
00386   res = icl_Unify(Address, t1 ,&connectionInfo);
00387   if (ConnectionId && *ConnectionId && res) {
00388 
00389     /* connectionInfo = tcp(Host,Port) */
00390     host = icl_CopyTerm(icl_NthTerm(connectionInfo, 1));
00391     port = icl_CopyTerm(icl_NthTerm(connectionInfo, 2));
00392     icl_Free(connectionInfo);
00393 
00394     /* if variable address, look it up... */
00395     if (icl_IsVar(host) || icl_IsVar(port)) {
00396       ICLTerm* resolved_var = NULL;
00397 
00398       oaa_ResolveVariable("default_facilitator", &resolved_var);
00399       if (icl_IsVar(host)) {
00400   icl_Free(host);
00401   host = icl_CopyTerm(icl_NthTerm(resolved_var, 1));
00402       }
00403       if (icl_IsVar(port)) {
00404   icl_Free(port);
00405   port = icl_CopyTerm(icl_NthTerm(resolved_var, 2));
00406       }
00407       icl_Free(resolved_var);
00408     }
00409 
00410     /* by now, host and port should be known */
00411     if (host && port && !icl_IsVar(host) && !icl_IsVar(port))
00412       connection = tcpConnect(icl_Str(host), icl_Int(port));
00413 
00414     oaa_ResolveVariable("use_password", &usePassword);
00415 
00416     if(usePassword != NULL) {
00417       if(icl_IsStr(usePassword) && 
00418          (strlen(icl_Str(usePassword)) == 4) &&
00419          (strncmp(icl_Str(usePassword), "true", 4) == 0)) {
00420         oaa_ResolveVariable("client_password", &passwordTerm);
00421         if(passwordTerm != NULL) {
00422           password = icl_NewStringFromTerm(passwordTerm);
00423         }
00424         else {
00425           password = strdup("_");
00426         }
00427       }
00428     }
00429 
00430     if(connection > 0) {
00431       // create an initial connection to determine what formats are supported
00432       TermReader* initialTr = termReader_create();
00433       StringTermReader* str;
00434       TermSender* initialTs = termSender_create();
00435       StringTermSender* sts;
00436       ICLTerm* whatFormats = NULL;
00437       ICLTerm* reply = NULL;
00438       ICLTerm* resultList = NULL;
00439       gint64 realPort = icl_Int(port);
00440       TermReader* realTr = NULL;
00441       TermSender* realTs = NULL;
00442       char* readerPtr;
00443       char* readerInfoBuf;
00444       char* senderPtr;
00445       char* senderInfoBuf;
00446       whatFormats = createEvConnectEvent(password, host);
00447       if(!whatFormats) {
00448         printf("NULL whatFormats\n");
00449       }
00450       icl_stFree(password);
00451       password = NULL;
00452 
00453       str = stringTermReader_create(initialTr, connection);
00454       sts = stringTermSender_create(initialTs, connection);
00455       
00456       termSender_sendTerm(initialTs, whatFormats);
00457       icl_Free(whatFormats);
00458       /*
00459       printf("libcom_tcp.c sending whatFormats message sent\n");
00460       */
00461       if(termSender_getError(initialTs) != TERMSENDER_OKAY) {
00462         fprintf(stderr, "libcom_tcp com_Connect could not send formatRequest message: %i\n", termSender_getError(initialTs));
00463         close_socket(termSender_getSocket(initialTs));
00464         termSender_free(initialTs);
00465         termReader_free(initialTr);
00466         goto failedconnect;
00467       }
00468       reply = NULL;
00469       do {
00470         /*
00471         printf("libcom_tcp.c sending whatFormats waiting for reply...\n");
00472         */
00473         if(reply != NULL) {
00474           icl_Free(reply);
00475         }
00476         reply = termReader_getNextTerm(initialTr, 0);
00477         /*
00478         printf("libcom_tcp.c sending whatFormats waiting got a reply\n");
00479         */
00480       } while(termReader_getError(initialTr) == TERMREADER_TIMEOUT);
00481 
00482       if(termReader_getError(initialTr) != TERMREADER_OKAY) {
00483         fprintf(stderr, "libcom_tcp com_Connect could not read next term: %i\n", termReader_getError(initialTr));
00484         close_socket(termReader_getSocket(initialTr));
00485         termReader_free(initialTr);
00486         termSender_free(initialTs);
00487         goto failedconnect;
00488       }
00489 
00490       if(icl_IsStruct(icl_NthTerm(icl_NthTerm(icl_NthTerm(reply, 1), 1), 1))) {
00491         ICLTerm* resStruct = icl_NthTerm(icl_NthTerm(icl_NthTerm(reply, 1), 1), 1);
00492         char* functor = icl_Functor(resStruct);
00493         if(functor != NULL) {
00494           if((strlen(functor) == 9) &&
00495              (strncmp(functor, "exception", 9) == 0)) {
00496             char* exceptionData = icl_NewStringFromTerm(icl_NthTerm(resStruct, 1));
00497             fprintf(stderr, "Exception in comConnectFormat: %s\n", exceptionData);
00498             icl_stFree(exceptionData);
00499             goto failedconnect;
00500           }
00501         }
00502       }
00503 
00504       /*
00505        * save old socket so we can close it later--this is only done because the Prolog
00506        * facilitator gets mixed up if we close this connection before opening the new
00507        * one.
00508        */
00509       oldConnection = connection;
00510       resultList = icl_NthTerm(icl_NthTerm(icl_NthTerm(reply, 1), 1), 1);
00511       if(((format == COM_BEST_FORMAT) || (format == COM_BINARY_FORMAT))  &&
00512          (icl_ParamValueAsInt("binary", resultList, &realPort) == TRUE)) {
00513         BinaryTermReader* btr;
00514         BinaryTermSender* bts;
00515 
00516 
00517         printf("libcom_tcp com_Connect() using binary format\n");
00518 
00519         connection = tcpConnect(icl_Str(host), (int)realPort);
00520         realTr = termReader_create();
00521         btr = binaryTermReader_create(realTr, connection);
00522         realTs = termSender_create();
00523         bts = binaryTermSender_create(realTs, connection);
00524       }
00525       else { /* if(icl_ParamValueAsInt("ascii", resultList, &realPort)) */
00526         StringTermReader* str;
00527         StringTermSender* sts;
00528 
00529         /*
00530         printf("libcom_tcp com_Connect() using ascii format\n");
00531         */
00532         connection = tcpConnect(icl_Str(host), (int)icl_Int(port));
00533         realTr = termReader_create();
00534         str = stringTermReader_create(realTr, connection);
00535 #ifdef NORMAL_GC
00536         printf("created str (%p) from realTr(%p)\n", str, realTr);
00537 #endif
00538         CHECK_LEAKS();
00539         realTs = termSender_create();
00540         sts = stringTermSender_create(realTs, connection);
00541         CHECK_LEAKS();
00542       }
00543       CHECK_LEAKS();
00544 
00545       icl_Free(reply);
00546       CHECK_LEAKS();
00547 
00548       close_socket(oldConnection);
00549       termReader_free(initialTr);
00550       termSender_free(initialTs);
00551       CHECK_LEAKS();
00552 #ifdef NORMAL_GC
00553       /* so we don't lose the pointer */
00554       /*
00555       if(termReaders == NULL) {
00556         termReaders = g_ptr_array_new();
00557       }
00558       g_ptr_array_add(termReaders, realTr);
00559       */
00560       termReaders[++termReadersIdx] = realTr;
00561       CHECK_LEAKS();
00562 #endif
00563       CHECK_LEAKS();
00564       readerPtr = pointerToString(realTr);
00565       readerInfoBuf = (char*)malloc(strlen(readerPtr) + 10 + 1);
00566       CHECK_LEAKS();
00567       strncpy(readerInfoBuf, "reader('", 8);
00568       strncpy(readerInfoBuf + 8, readerPtr, strlen(readerPtr));
00569       readerInfoBuf[strlen(readerPtr) + 8] = '\'';
00570       readerInfoBuf[strlen(readerPtr) + 9] = ')';
00571       readerInfoBuf[strlen(readerPtr) + 10] = '\0';
00572       CHECK_LEAKS();
00573       readerInfo = icl_NewTermFromString(readerInfoBuf);
00574       free(readerPtr);
00575       free(readerInfoBuf);
00576       senderPtr = pointerToString(realTs);
00577 #ifdef NORMAL_GC
00578       /* so we don't lose the pointer */
00579       /*
00580       if(termSenders == NULL) {
00581         termSenders = g_ptr_array_new();
00582       }
00583       g_ptr_array_add(termSenders, realTs);
00584       */
00585       termSenders[++termSendersIdx] = realTs;
00586 #endif
00587       senderInfoBuf = (char*)malloc(strlen(senderPtr) + 10 + 1);
00588       strncpy(senderInfoBuf, "sender('", 8);
00589       strncpy(senderInfoBuf + 8, senderPtr, strlen(senderPtr));
00590       senderInfoBuf[strlen(senderPtr) + 8] = '\'';
00591       senderInfoBuf[strlen(senderPtr) + 9] = ')';
00592       senderInfoBuf[strlen(senderPtr) + 10] = '\0';
00593       senderInfo = icl_NewTermFromString(senderInfoBuf);
00594       free(senderPtr);
00595       free(senderInfoBuf);
00596     }
00597 
00598     if (connection>0) {
00599       com_set_connect_info(ConnectionId, Params, host, port, readerInfo, senderInfo, connection);
00600     }
00601     else {
00602       char* format = get64BitFormatWrapped("com_Connect: failed to connect to %s ", ".");
00603       fprintf(stderr, format, icl_Str(host), icl_Int(port));
00604       fprintf(stderr, "\n");
00605       free(format);
00606     }
00607     if (!host && !port)
00608       fprintf(stderr, "Make sure you have a valid setup.pl file\n");
00609   }
00610   else {
00611     char* addressString = icl_NewStringFromTerm(Address);
00612     fprintf(stderr, "%s Error!  Bad arguments to com_Connect(): connectionId = %s address = %s\n", __FILE__, ConnectionId, addressString);
00613     icl_stFree(addressString);
00614   }
00615 
00616   icl_Free(t1);
00617 #ifdef NORMAL_GC
00618   printf("Leaving comConnectFormat\n");
00619 #endif
00620 
00621   if(port) icl_Free(port);
00622   if(host) icl_Free(host);
00623   return connection;
00624 
00625 failedconnect:
00626   if(port) icl_Free(port);
00627   if(host) icl_Free(host);
00628   return -1;
00629 }
00630 
00637 EXPORT_MSCPP
00638 int EXPORT_BORLAND
00639 com_Disconnect(char *ConnectionId)
00640 {
00641   ICLTerm *t1, *info = NULL;
00642 
00643   if (ConnectionId == NULL) {
00644     return com_disconnect_all_connection_ids();
00645   }
00646 
00647   if (ConnectionId && *ConnectionId && commdb) {
00648     if (com_GetInfo(ConnectionId,
00649                     (t1 = icl_NewStruct("connection", 1,
00650                                         icl_NewVar("_"))), &info)) {
00651 
00652       tcpShutdown(icl_Int(icl_NthTerm(info, 1)));
00653       icl_Free(info);
00654       icl_Free(t1);
00655       db_Retract(commdb,
00656                  (t1 = icl_NewStruct("com_connection_info", 5,
00657                                      icl_NewStr(ConnectionId),
00658                                      icl_NewVar("TCP"),
00659                                      icl_NewVar("_Type"),
00660                                      icl_NewVar("_Info"),
00661                                      icl_NewStr("connected"))),
00662                  ICL_EMPTY);
00663     }
00664     icl_Free(t1);
00665     return TRUE;
00666   }
00667   else return FALSE;
00668 }
00669 
00673 EXPORT_MSCPP
00674 SOCKET EXPORT_BORLAND
00675 com_Connected(char* ConnectionId)
00676 {
00677   ICLTerm *t1;
00678 
00679   /* Make sure there's a valid connection id */
00680   if (ConnectionId && *ConnectionId) {
00681     ICLTerm *currentStatus = NULL;
00682     com_GetInfo(ConnectionId,
00683                 (t1 = icl_NewStruct("status", 1,
00684                                     icl_NewStr("connected"))),&currentStatus);
00685     icl_Free(t1);
00686     if (currentStatus && STREQ(icl_Str(currentStatus),"connected")) {
00687       icl_Free(currentStatus);
00688       return TRUE;
00689     }
00690   }
00691   return FALSE;
00692 }
00693 
00694 /****************************************************************************
00695  * name:    com_GetAllValidConnections
00696  * purpose:
00697  * remarks:
00698  ****************************************************************************/
00699 EXPORT_MSCPP
00700 int EXPORT_BORLAND
00701 com_GetAllValidConnections(ICLTerm** result)
00702 {
00703   int index;
00704   ICLTerm *resultFromDb = (ICLTerm *)NULL;
00705   static ICLTerm* info = NULL;
00706 
00707   if (!icl_IsValid(info)) {
00708     info = icl_NewTermFromData("com_connection_info(Id,Protocol,Type,InfoList,connected)",56);
00709   }
00710 
00711   *result = icl_NewList(NULL);
00712 
00713   if (db_Solve(commdb, info, ICL_EMPTY, &resultFromDb)) {
00714     /*
00715       Retreives all Id from the list of connected connections.
00716       Not the best optimized way to to it (n^2 order), but lists of connections
00717       are not going to be huge.
00718     */
00719 
00720     ICLTerm *item = NULL;
00721 
00722     /* DEBUG */
00723     /*
00724       printf("Result from db_Solve %s\n", icl_NewStringFromTerm(resultFromDb));
00725     */
00726 
00727     for (index = 1; index <= icl_ListLen(resultFromDb) ; index ++) {
00728       item = icl_NthTerm(resultFromDb, index);
00729       icl_AddToList(*result, icl_CopyTerm(icl_NthTerm(item, 1)), TRUE);
00730     }
00731     icl_Free(resultFromDb);
00732     return TRUE;
00733   }
00734   icl_Free(resultFromDb);
00735   return FALSE;
00736 };
00737 
00742 int com_GetConnectionFromInfo(ICLTerm *Info, char** connectionName)
00743 {
00744   ICLTerm *resultFromDb = (ICLTerm *)NULL;
00745   int      res = FALSE;
00746   static ICLTerm* info = NULL;
00747 
00748   if (!icl_IsValid(info)) {
00749     info = icl_NewTermFromData("com_connection_info(Conn,Protocol,Type,InfoList,Status)",55);
00750   }
00751 
00752   (void)Info;
00753 
00754   if (db_Solve(commdb, info, ICL_EMPTY, &resultFromDb)) {
00755     *connectionName =
00756       strdup(icl_Str(icl_NthTerm(icl_NthTerm(resultFromDb, 1),1)));
00757     res = TRUE;
00758     icl_Free(resultFromDb);
00759   }
00760   return res;
00761 }
00762 
00779 EXPORT_MSCPP
00780 int EXPORT_BORLAND
00781 com_ListenAt(char *ConnectionId, ICLTerm *Params, ICLTerm *Address)
00782 {
00783   ICLTerm *connectionInfo = NULL, *host = NULL, *port = NULL;
00784   ICLTerm *info = NULL, *t1 = NULL;
00785   ICLTerm *resolve_vars_false = icl_NewTermFromData("resolve_vars(false)",19);
00786   int do_resolve_vars = TRUE;
00787   char cmd[600];
00788   char *addressString = NULL;
00789   char *hostString = NULL;
00790   int res = FALSE;
00791   ICLTerm* addrFromCmdLine = NULL;
00792   ICLTerm* addrFromSetup = NULL;
00793   ICLTerm* addrFromEnv = NULL;
00794 
00795   // Make sure database is initialized
00796   if (!commdb)
00797     commdb = db_NewDB();
00798 
00799   t1 = icl_NewStruct("tcp", 2, icl_NewVar("Host"), icl_NewVar("Port"));
00800   if (ConnectionId && *ConnectionId && icl_Unify(Address, t1, &connectionInfo)) {
00801 
00802     // connectionInfo = tcp(Host,Port)
00803     host = icl_NthTerm(connectionInfo, 1);
00804     port = icl_NthTerm(connectionInfo, 2);
00805 
00806     // if variable address, look it up...
00807     do_resolve_vars = !icl_GetParamValue(resolve_vars_false, Params, NULL);
00808     icl_Free(resolve_vars_false);
00809     if (do_resolve_vars && (icl_IsVar(host) || icl_IsVar(port))) {
00810         oaa_ResolveVariable("oaa_connect", &addrFromSetup);   // setup file
00811         Address = addrFromSetup;
00812         addressString = getenv("OAA_CONNECT");         // env var
00813         if (addressString) {
00814           icl_Free(Address);
00815           addrFromEnv = icl_NewTermFromString(addressString);
00816           Address = addrFromEnv;
00817         }
00818         if(oaa_ResolveVariable("-oaa_connect", &addrFromCmdLine)) {  // cmd line
00819           icl_Free(Address);
00820           Address = addrFromCmdLine;
00821         }
00822     }
00823 
00824     icl_Free(connectionInfo);
00825 
00826     // by now, host and port should be known
00827     icl_Unify(Address, t1, &connectionInfo);
00828 
00829     // connectionInfo = tcp(Host,Port)
00830     host = icl_NthTerm(connectionInfo, 1);
00831     port = icl_NthTerm(connectionInfo, 2);
00832 
00833     if (!icl_IsVar(host) && !icl_IsVar(port))
00834 
00835       do {
00836         //
00837         hostString = strdup(icl_Str(host));
00838         if (!strcmp("localhost", hostString)) {
00839           icl_stFree(hostString);
00840           hostString = com_get_localhost_ip_address();
00841         }
00842         // stringPort, binaryPort, stringListener, and binaryListener
00843         // have file scope
00844         stringPort = icl_Int(port);
00845         binaryPort = stringPort + OAA_BINARY_PORT_OFFSET;
00846         stringListener = tcpListenAtPort(stringPort);
00847         binaryListener = tcpListenAtPort(binaryPort);
00848         if ( (stringListener > 0) && (binaryListener > 0) ) {
00849           sprintf(cmd, "com_connection_info('%s', tcp, server, \
00850             [oaa_address(addr(tcp('%s',%d))), oaa_host('%s'), oaa_port(%d)], \
00851             connected)",
00852                   ConnectionId, hostString, stringPort,
00853                   hostString, stringPort);
00854           info = icl_NewTermFromString(cmd);
00855 
00856           if (info) {
00857             db_Assert(commdb, info, ICL_EMPTY);
00858             icl_Free(info);
00859           }
00860           else {
00861             printf("Error! %s:%i com_connection_info could not be constructed.\n", __FILE__, __LINE__);
00862           }
00863           res = TRUE;
00864         }
00865         else {
00866           if (!com_ask_about_tcp_exception(icl_Int(port),
00867                                            icl_NewStringFromTerm(host))) {
00868             icl_Free(connectionInfo);
00869             icl_Free(t1);
00870             icl_stFree(hostString);
00871             return FALSE;
00872           }
00873         }
00874       } while (!res);
00875     icl_Free(connectionInfo);
00876   }
00877   else {
00878     printf("Error! Bad arguments to com_ListenAt()\n");
00879             db_Assert(commdb, info, ICL_EMPTY);
00880   }
00881   icl_Free(t1);
00882   icl_stFree(hostString);
00883   return res;
00884 } // end of com_ListenAt
00885 
00886 /****************************************************************************
00887  * Utility functions for com_ListenAt
00888  ****************************************************************************/
00889 
00890 void
00891 com_print_tcp_exception_help() {
00892 #ifdef _WINDOWS
00893 #else
00894   printf("I've just attempted to listen on the specified port, but was unable\n");
00895   printf("to gain control of it.  This could be because there's already a\n");
00896   printf("Facilitator, or some other program, making use of that port.  Or, it\n");
00897   printf("could be that a Facilitator using that port was just terminated.  In\n");
00898   printf("such cases, the port may be inaccessible for a brief period (usually\n");
00899   printf("only a few seconds, but sometimes more).  It may help to kill any\n");
00900   printf("client agents which may still be connected to the defunct Facilitator.\n");
00901 
00902   printf("If you think the specified port may now be accessible, enter \"y\" and\n");
00903   printf("I'll try again.  You may request retry any number of times.\n");
00904 
00905   printf("If you want me to listen on a different port, enter \"n\", which will\n");
00906   printf("cause me to terminate.  Then change your port specification (it's\n");
00907   printf("either in a setup file or an environment variable).  Then restart me.\n");
00908 
00909 #endif
00910 }
00911 
00912 int
00913 com_ask_about_tcp_exception(int Port, char *Host) {
00914 
00915   char response[100];
00916 
00917   do {
00918     printf("Currently unable to access %s port %d.\n Try again? [y)es, n)o, h(elp] ", Host, Port);
00919 
00920     scanf("%s\n", (char *)(&response));
00921 
00922     if (strcmp(response,"y") == 0)
00923       return 1;
00924     else
00925 
00926       if (strcmp(response, "n") == 0)
00927         return 0;
00928       else
00929 
00930         if (strcmp(response, "h") == 0)
00931           com_print_tcp_exception_help();
00932   } while (TRUE);
00933 }
00934 
00939 int com_get_socket_from_connection_id(char* ConnectionId) {
00940   ICLTerm *info = NULL;
00941   int socketNumber = -1;
00942 
00943   /* DEBUG */
00944   /*
00945     if (commdb)
00946     db_PrintDB(commdb);
00947   */
00948 
00949   ICLTerm* requestTerm = icl_NewStruct("connection", 1, icl_NewVar("_"));
00950   if (ConnectionId && *ConnectionId && commdb &&
00951       com_GetInfo(ConnectionId, requestTerm, &info)) {
00952     icl_NthTermAsInt(info, 1, &socketNumber);
00953     icl_Free(info);
00954   }
00955   icl_Free(requestTerm);
00956 
00957   return socketNumber;
00958 }
00959 
00964 int basic_send(SOCKET inSocket, char *Data, int Size)
00965 {
00966   int res = FALSE;
00967 
00968   /* send data */
00969   if (inSocket>0) {
00970 #ifdef _WINDOWS  /* ---------------------------------------------------- */
00971     if (send(inSocket, Data, Size,0) == SOCKET_ERROR)
00972       res = -1;
00973 #else
00974     res = (write(inSocket, Data, Size) == Size);
00975 #endif  /* --------------------------------------------------------------- */
00976   }
00977   return res;
00978 }
00979 
00984 EXPORT_MSCPP
00985 int EXPORT_BORLAND
00986 com_SendData(char *ConnectionId, char *Data, int Size)
00987 {
00988   int res = FALSE;
00989   int socketNumber = com_get_socket_from_connection_id(ConnectionId);
00990 
00991   /* send data */
00992   if (socketNumber>0) {
00993     char *buffer = (char*)malloc(Size+8);    /* count \0 too */
00994     sprintf(buffer,"term(%s).", Data);
00995 
00996     /* DEBUG */
00997     /*
00998       printDebug(5, "Sending %s\n", buffer);
00999     */
01000 
01001     basic_send(socketNumber, buffer, strlen(buffer));
01002     buffer[0] = 10;
01003     basic_send(socketNumber, buffer, 1);
01004 
01005     free(buffer);
01006   }
01007   return res;
01008 }
01009 
01010 EXPORT_MSCPP
01011 int EXPORT_BORLAND
01012 com_SendTerm(char* connectionId, ICLTerm* term)
01013 {
01014   ICLTerm* result;
01015   ICLTerm* query = icl_NewStruct("sender", 1, icl_NewVar("_"));
01016   char* pointerStr;
01017   TermSender* ts;
01018   if(!com_GetInfo(connectionId, query, &result)) {
01019     icl_Free(query);
01020     return FALSE;
01021   }
01022 
01023   pointerStr = icl_Str(icl_NthTerm(result, 1));
01024   pointerStr = strdup(pointerStr);
01025   icl_stRemoveQuotes(pointerStr);
01026   ts = (TermSender*)stringToPointer(pointerStr);
01027   free(pointerStr);
01028   termSender_sendTerm(ts, term);
01029   if(termSender_getError(ts) != TERMSENDER_OKAY) {
01030     fprintf(stderr, "com_SendTerm(char*, ICLTerm*) did not send term\n");
01031     icl_Free(query);
01032     icl_Free(result);
01033     return FALSE;
01034   }
01035   icl_Free(query);
01036   icl_Free(result);
01037   return TRUE;
01038 }
01039 
01046 EXPORT_MSCPP
01047 int EXPORT_BORLAND
01048 com_AddInfo(char *ConnectionId, ICLTerm *NewInfo)
01049 {
01050   ICLTerm *info = NULL, *result = NULL;
01051   char cmd[200];
01052   int changed = FALSE;
01053   int res = FALSE;
01054 
01055   /* Make sure there's a valid connection id */
01056   if (!ConnectionId || !*ConnectionId || !commdb)
01057     return FALSE;
01058 
01059   sprintf(cmd, "com_connection_info('%s', Protocol, Type,InfoList,Status)",
01060           ConnectionId);
01061   info = icl_NewTermFromString(cmd);
01062 
01063   if (db_Solve(commdb, info, ICL_EMPTY, &result)) {
01064     ICLTerm *ninfo = NULL, *item;
01065     ICLTerm *protocol, *type, *infolist, *status;
01066     ICLTerm* templateProtocol =  icl_NewStruct("protocol", 1, icl_NewVar("_"));
01067     ICLTerm* templateStatus =  icl_NewStruct("status", 1, icl_NewVar("_"));
01068     ICLTerm* templateType =  icl_NewStruct("type", 1, icl_NewVar("_"));
01069 
01070     db_Retract(commdb, info, ICL_EMPTY);
01071     item = icl_NthTerm(result, 1);
01072 
01073     if (icl_Unify(templateProtocol, NewInfo, NULL)) {
01074       protocol = icl_NthTerm(NewInfo, 1);
01075       changed = TRUE;
01076     }
01077     else protocol = icl_NthTerm(item, 2);
01078 
01079     if (icl_Unify(templateType, NewInfo, NULL)) {
01080       type = icl_NthTerm(NewInfo, 1);
01081       changed = TRUE;
01082     }
01083     else type = icl_NthTerm(item, 3);
01084 
01085     if (icl_Unify(templateStatus, NewInfo, NULL)) {
01086       status = icl_NthTerm(NewInfo, 1);
01087       changed = TRUE;
01088     }
01089     else status = icl_NthTerm(item, 5);
01090 
01091     if (!changed) {
01092       if (icl_IsList(NewInfo)) {
01093         ICLListType *p = icl_List(NewInfo);
01094         while (p) {
01095           icl_AddToList(icl_NthTerm(item, 4), icl_CopyTerm(p->elt), TRUE);
01096           p = p->next;
01097         }
01098       }
01099       else
01100         icl_AddToList(icl_NthTerm(item, 4), icl_CopyTerm(NewInfo), TRUE);
01101     }
01102     infolist = icl_NthTerm(item, 4);
01103 
01104     ninfo = icl_NewStruct("com_connection_info", 5, icl_NewStr(ConnectionId),
01105                           icl_CopyTerm(protocol), icl_CopyTerm(type),
01106                           icl_CopyTerm(infolist), icl_CopyTerm(status));
01107 
01108     db_Assert(commdb, ninfo, ICL_EMPTY);
01109 
01110     icl_Free(ninfo);
01111     icl_Free(result);
01112     icl_Free(templateProtocol);
01113     icl_Free(templateStatus);
01114     icl_Free(templateType);
01115     res = TRUE;
01116   }
01117   else {
01118     res = FALSE;
01119   }
01120   icl_Free(info);
01121   return res;
01122 }
01123 
01128 EXPORT_MSCPP
01129 int EXPORT_BORLAND
01130 com_RemoveInfo(char *connectionId, ICLTerm *toMatch)
01131 {
01132   ICLTerm *info = NULL, *result = NULL;
01133   char cmd[200];
01134   int res = FALSE;
01135 
01136   /* Make sure there's a valid connection id */
01137   if (!connectionId || !*connectionId || !commdb)
01138     return FALSE;
01139   sprintf(cmd, "com_connection_info('%s', Protocol, Type,InfoList,Status)",
01140           connectionId);
01141   info = icl_NewTermFromString(cmd);
01142 
01143   if (db_Solve(commdb, info, ICL_EMPTY, &result)) {
01144     ICLTerm *ninfo = NULL, *item;
01145     ICLTerm *protocol, *existingList, *type, *status;
01146 
01147     db_Retract(commdb, info, ICL_EMPTY);
01148     item = icl_NthTerm(result, 1);
01149 
01150     protocol = icl_NthTerm(item, 2);
01151     type = icl_NthTerm(item, 3);
01152     status = icl_NthTerm(item, 5);
01153 
01154     existingList = icl_NthTerm(item, 4);
01155     if (icl_IsList(toMatch)) {
01156       ICLListType *p;
01157       for(p = icl_List(toMatch); p; p = p->next) {
01158         ICLTerm* unifier = icl_GenerateSimpleUnifyingTerm(p->elt);
01159         icl_RemoveUnifyingFromList(existingList, unifier, TRUE);
01160         icl_FreeTerm(unifier);
01161       }
01162     }
01163     else {
01164       ICLTerm* unifier = icl_GenerateSimpleUnifyingTerm(toMatch);
01165       icl_RemoveUnifyingFromList(existingList, unifier, TRUE);
01166       icl_FreeTerm(unifier);
01167     }
01168 
01169     ninfo = icl_NewStruct("com_connection_info", 5, icl_NewStr(connectionId),
01170                           icl_CopyTerm(protocol), icl_CopyTerm(type),
01171                           icl_CopyTerm(existingList), icl_CopyTerm(status));
01172 
01173     db_Assert(commdb, ninfo, ICL_EMPTY);
01174 
01175     icl_Free(ninfo);
01176     icl_Free(result);
01177     res = TRUE;
01178   }
01179   else {
01180     res = FALSE;
01181   }
01182   icl_Free(info);
01183   return res;
01184   
01185 }
01186 
01193 EXPORT_MSCPP
01194 int EXPORT_BORLAND
01195 com_UpdateInfo(char *ConnectionId, ICLTerm *NewInfo)
01196 {
01197   ICLTerm *info = NULL, *result = NULL;
01198   char cmd[200];
01199   int changed = FALSE;
01200   int res = FALSE;
01201 
01202   /* Make sure there's a valid connection id */
01203   if (!ConnectionId || !*ConnectionId || !commdb)
01204     return FALSE;
01205   sprintf(cmd, "com_connection_info('%s', Protocol, Type,InfoList,Status)",
01206           ConnectionId);
01207   info = icl_NewTermFromString(cmd);
01208 
01209   if (db_Solve(commdb, info, ICL_EMPTY, &result)) {
01210     ICLTerm *ninfo = NULL, *item;
01211     ICLTerm *protocol, *existingList, *type, *status;
01212     ICLTerm* templateProtocol =  icl_NewStruct("protocol", 1, icl_NewVar("_"));
01213     ICLTerm* templateStatus =  icl_NewStruct("status", 1, icl_NewVar("_"));
01214     ICLTerm* templateType =  icl_NewStruct("type", 1, icl_NewVar("_"));
01215 
01216     db_Retract(commdb, info, ICL_EMPTY);
01217     item = icl_NthTerm(result, 1);
01218 
01219     if (icl_Unify(templateProtocol, NewInfo, NULL)) {
01220       protocol = icl_NthTerm(NewInfo, 1);
01221       changed = TRUE;
01222     }
01223     else protocol = icl_NthTerm(item, 2);
01224 
01225     if (icl_Unify(templateType, NewInfo, NULL)) {
01226       type = icl_NthTerm(NewInfo, 1);
01227       changed = TRUE;
01228     }
01229     else type = icl_NthTerm(item, 3);
01230 
01231     if (icl_Unify(templateStatus, NewInfo, NULL)) {
01232       status = icl_NthTerm(NewInfo, 1);
01233       changed = TRUE;
01234     }
01235     else status = icl_NthTerm(item, 5);
01236 
01237     existingList = icl_NthTerm(item, 4);
01238     if (!changed) {
01239       if (icl_IsList(NewInfo)) {
01240         ICLListType *p;
01241         for(p = icl_List(NewInfo); p; p = p->next) {
01242           ICLTerm* unifier = icl_GenerateSimpleUnifyingTerm(p->elt);
01243           int numReplaced = icl_ReplaceUnifying(existingList, unifier, p->elt, TRUE);
01244 
01245           if(numReplaced < 1) {
01246             icl_AddToList(existingList, icl_CopyTerm(p->elt), TRUE);
01247           }
01248           icl_FreeTerm(unifier);
01249         }
01250       }
01251       else {
01252         ICLTerm* unifier = icl_GenerateSimpleUnifyingTerm(NewInfo);
01253         int numReplaced = icl_ReplaceUnifying(existingList, unifier, NewInfo, TRUE);
01254         if(numReplaced < 1) {
01255           icl_AddToList(existingList, icl_CopyTerm(NewInfo), TRUE);
01256         }
01257         icl_FreeTerm(unifier);
01258       }
01259     }
01260 
01261     ninfo = icl_NewStruct("com_connection_info", 5, icl_NewStr(ConnectionId),
01262                           icl_CopyTerm(protocol), icl_CopyTerm(type),
01263                           icl_CopyTerm(existingList), icl_CopyTerm(status));
01264 
01265     db_Assert(commdb, ninfo, ICL_EMPTY);
01266 
01267     icl_Free(ninfo);
01268     icl_Free(result);
01269     icl_Free(templateProtocol);
01270     icl_Free(templateStatus);
01271     icl_Free(templateType);
01272     res = TRUE;
01273   }
01274   else {
01275     res = FALSE;
01276   }
01277   icl_Free(info);
01278   return res;
01279 }
01280 
01281 static void com_getMatchingInfos(ICLTerm const*info, ICLTerm **result)
01282 {
01283   ICLTerm* allMatcher = icl_NewStruct("com_connection_info", 5, icl_NewVar("_"), icl_NewVar("_"), icl_NewVar("_"), icl_NewVar("_"), icl_NewVar("_"));
01284   ICLTerm* allMatches = NULL;
01285   *result = icl_NewList(NULL);
01286   if(db_Solve(commdb, allMatcher, ICL_EMPTY, &allMatches)) {
01287     // each entry in allMatches is a com_connection_info struct, and we need to check for the
01288     // the presence of things that unify with the given ICLTerm* info
01289     ICLListType* listElem;
01290     for(listElem = icl_List(allMatches); listElem; listElem = icl_ListNext(listElem)) {
01291       if(icl_Member(info, icl_NthTerm(icl_ListElt(listElem), 4), NULL)) {
01292         icl_AddToList(*result, icl_CopyTerm(icl_ListElt(listElem)), FALSE);
01293       }
01294     }
01295     icl_Free(allMatches);
01296   }
01297   icl_Free(allMatcher);
01298 }
01299 
01304 EXPORT_MSCPP
01305 int EXPORT_BORLAND
01306 com_GetInfo(char *ConnectionId, ICLTerm *GInfo, ICLTerm **Result)
01307 {
01308   ICLTerm *info = NULL, *result = NULL;
01309   char cmd[200];
01310   int res = FALSE;
01311 
01312   /* DEBUG */
01313   /* printf("Comm get info for %s\n", icl_NewStringFromTerm(GInfo));*/
01314 
01315 
01316   /* Make sure there's a valid connection id */
01317   if (!ConnectionId || !*ConnectionId || !commdb)
01318     return FALSE;
01319 
01320   sprintf(cmd, "com_connection_info(%s, Protocol, Type,InfoList,Status)",
01321           ConnectionId);
01322   info = icl_NewTermFromString(cmd);
01323 
01324   if (db_Solve(commdb, info, ICL_EMPTY, &result)) {
01325 
01326     ICLTerm *item = NULL;
01327     ICLTerm* templateProtocol =  icl_NewStruct("protocol", 1, icl_NewVar("_"));
01328     ICLTerm* templateStatus =  icl_NewStruct("status", 1, icl_NewVar("_"));
01329     ICLTerm* templateType =  icl_NewStruct("type", 1, icl_NewVar("_"));
01330 
01331     /* DEBUG */
01332     /* printf("Result from solve %s\n", icl_NewStringFromTerm(result));*/
01333 
01334     res = TRUE;
01335     item = icl_NthTerm(result, 1);
01336 
01337     if (icl_Unify(templateProtocol, GInfo, NULL)) {
01338       if (Result!=NULL) {
01339         *Result = icl_CopyTerm(icl_NthTerm(item, 2));
01340       }
01341       res = TRUE;
01342     }
01343     else if (icl_Unify(templateStatus, GInfo, NULL)){
01344       if (Result!=NULL) {
01345   *Result = icl_CopyTerm(icl_NthTerm(item, 5));
01346       }
01347       res = TRUE;
01348     }
01349     else if (icl_Unify(templateType, GInfo, NULL)){
01350       if (Result!=NULL) {
01351   *Result = icl_CopyTerm(icl_NthTerm(item, 3));
01352       }
01353       res = TRUE;
01354     }
01355     else if (Result!=NULL) {
01356       ICLTerm* tempList = icl_NthTerm(item, 4);
01357       /* printf("List %s\n", icl_NewStringFromTerm(tempList)); */
01358       res = icl_Member(GInfo, tempList, Result);
01359     }
01360 
01361     icl_Free(templateProtocol);
01362     icl_Free(templateStatus);
01363     icl_Free(templateType);
01364     icl_Free(result);
01365   }
01366   else {
01367     res = FALSE;
01368   }
01369   icl_Free(info);
01370 
01371   return res;
01372 }
01373 
01374 
01379 EXPORT_MSCPP
01380 int EXPORT_BORLAND
01381 com_GetConnectionId(char **ConnectionId, ICLTerm *GInfo)
01382 {
01383   static ICLTerm *info = NULL;
01384   ICLTerm *result = NULL;
01385   int res = FALSE;
01386 
01387   if (!icl_IsValid(info)) {
01388     info = icl_NewTermFromData("com_connection_info(ConnectionId,Protocol,Type,InfoList,Status)",63);
01389   }
01390 
01391   /*
01392    * Retrieve all connection_info terms from the db
01393    */
01394   if (db_Solve(commdb, info, ICL_EMPTY, &result)) {
01395 
01396     ICLListType *reslist = icl_List(result);
01397 
01398     while(reslist != NULL) {
01399       ICLTerm *current = reslist->elt;
01400       ICLTerm *infolist = icl_NthTerm(current, 4);
01401       reslist = reslist->next;
01402 
01403       /*
01404        * check for the various types of info to match against
01405        */
01406       if(icl_ParamValue(icl_Functor(GInfo), icl_NthTerm(GInfo, 1),
01407                         infolist, NULL)) {
01408         *ConnectionId = strdup(icl_Str(icl_NthTerm(current, 1)));
01409         res = TRUE;
01410         break;
01411       }
01412     }
01413     icl_Free(result);
01414     /* Free reslist */
01415   }
01416   return res;
01417 } // end of com_GetConnectionId
01418 
01419 /*
01420  * name:    com_SelectEvent
01421  * purpose:
01422  * remarks:
01423  * returns: True if success
01424  * @deprecated use com_SelectEventFromAllIds
01425  */
01426 EXPORT_MSCPP
01427 int EXPORT_BORLAND
01428 com_SelectEvent(char* inConnectionId, double timeout, ICLTerm **event){
01429   ICLTerm* readerTerm;
01430   ICLTerm* query;
01431   TermReader* reader;
01432   char* readerString;
01433 
01434   query = icl_NewTermFromData("reader(_)",9);
01435   if(!com_GetInfo(inConnectionId, query, &readerTerm)) {
01436     icl_Free(query);
01437     fprintf(stderr, "com_SelectEvent could not find reader\n");
01438     return FALSE;
01439   }
01440   readerString = icl_Str(icl_NthTerm(readerTerm, 1));
01441   readerString = strdup(readerString);
01442   icl_stRemoveQuotes(readerString);
01443   reader = (TermReader*)stringToPointer(readerString);
01444   free(readerString);
01445   *event = termReader_getNextTerm(reader, timeout);
01446   icl_Free(query);
01447   icl_Free(readerTerm);
01448   if(*event == NULL) {
01449     int te = termReader_getError(reader);
01450     fprintf(stderr, "com_SelectEvent error in reading term: %i\n", te);
01451     return FALSE;
01452   }
01453   else {
01454     return TRUE;
01455   }
01456 }
01457 
01461 EXTERN void comPrintError(char *str, ...) {
01462   char buf[512];
01463   va_list ptr;
01464   va_start(ptr,str);
01465   vsprintf(buf,str,ptr);
01466   printf("COM ERROR : %s\n",  buf);
01467   va_end(ptr);
01468 }
01469 
01470 /****************************************************************************
01471  * Definitions for MICROSOFT WINDOWS Specific functions
01472  ****************************************************************************/
01473 
01474 #ifdef _WINDOWS   /*---------------------------------------------------*/
01475 
01476 int EXPORT_MSCPP tcpStartup()
01477 {
01478   WORD wVersionRequested;
01479   WSADATA wsaData;
01480 
01481   /* Asking for version 1.1 */
01482   wVersionRequested=MAKEWORD(1,1);
01483 
01484   if (WSAStartup(wVersionRequested,&wsaData)!=0)
01485     return (0);
01486 
01487   if (LOBYTE(wsaData.wVersion)!=1 ||
01488       HIBYTE(wsaData.wVersion)!=1) {
01489     WSACleanup();
01490     return(0);
01491   }
01492 
01493   return(1);
01494 }
01495 
01496 void EXPORT_MSCPP tcpTestDLL(void)
01497 {
01498   MessageBox(NULL, "Testing, one ,two, ...", "TCP Dll", MB_OK);
01499 }
01500 
01501 #endif /* _WINDOWS */
01502 
01503 /****************************************************************************
01504  * General definitions
01505  ****************************************************************************/
01506 
01511 void tcpShutdown(SOCKET socket) {
01512   shutdown(socket,2);
01513 #ifdef _WINDOWS
01514   shutdown(socket, 0);
01515 #else
01516   close(socket);
01517 #endif
01518 }
01519 
01523 struct hostent* getHostEntry(char const* hostName)
01524 {
01525   struct hostent* he = gethostbyname(hostName);
01526   u_long addr;
01527   if (!he) {
01528     addr = inet_addr(hostName);
01529     if ((int)addr != -1) {
01530       he = gethostbyaddr((char *)&addr, sizeof(u_long), AF_INET);
01531     }
01532   }
01533 
01534   return he;
01535 }
01536 
01543 SOCKET tcpConnect(char *hostName, int port)
01544 {
01545   SOCKET s;
01546   int ret;
01547   struct sockaddr_in saddr;
01548   struct hostent *he;
01549 
01550   he = getHostEntry(hostName);
01551   if (!he) {
01552     printf("TCP - Could not resolve host: %s\n", hostName);
01553     return -1;
01554   }
01555 
01556   ret = socket(AF_INET, SOCK_STREAM, 0);
01557 
01558   if (ret < 0) {
01559     printf("TCP - Could not create socket.\n");
01560     return -1;
01561   }
01562   else s = ret;
01563 
01564 
01565 #ifndef _WINDOWS
01566   memset((char*)&saddr, 0, sizeof(saddr));
01567   memcpy((char*)&saddr.sin_addr.s_addr, he->h_addr, he->h_length);
01568   memset(&(saddr.sin_zero), '\0', 8);
01569 #else
01570   memcpy((char FAR *) &saddr.sin_addr, (char FAR *)he->h_addr, he->h_length);
01571 #endif
01572 
01573   saddr.sin_family = AF_INET;
01574   saddr.sin_port = htons(port);
01575 
01576   if (connect(s, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
01577     printf("TCP - Could not connect to %s #%d\n", hostName,port);
01578     perror("Reason");
01579     tcpShutdown(s);
01580     return -1;
01581   }
01582 
01583   return s;
01584 }
01585 
01586 
01593 int tcpSelect(SOCKET socket, double timeout) {
01594 
01595   struct timeval  time, *timep = NULL;
01596 #ifndef _WINDOWS
01597   struct timezone timezone;
01598 #endif
01599 
01600   int width;
01601   fd_set readfds;
01602 
01603   if (timeout > 0) {
01604 #ifndef _WINDOWS
01605     if(gettimeofday(&time, &timezone) == -1)
01606       printf("TCP - error getting time of data\n");
01607     else {
01608 #endif
01609       long i;
01610       i = (long)timeout;        /* Convert double to integer, loss of data ! */
01611       time.tv_sec = i;    /* Add number of seconds to current time */
01612       time.tv_usec = 0.5 + (timeout - i) * 1e6;  /* Add fractional msecs */
01613       timep = &time;
01614 #ifndef _WINDOWS
01615     }
01616 #endif
01617   }
01618 
01619   width = socket + 1;          /* Width must include current fd   */
01620   FD_ZERO(&readfds);           /* Clear fds                       */
01621   FD_SET(socket, &readfds);    /* Add file descriptor to read set */
01622 
01623   return select(width, &readfds, NULL, NULL, timep);
01624 }
01625 
01632 EXPORT_MSCPP
01633 char * EXPORT_BORLAND
01634 com_GetDefaultConnectionId()
01635 {
01636   static ICLTerm *info = NULL;
01637   ICLTerm *answers = NULL;
01638   char *result = NULL;
01639 
01640   if (!icl_IsValid(info)) {
01641     info = icl_NewTermFromData(
01642       "com_connection_info(Id,Protocol,Type,InfoList,Status)",53);
01643   }
01644 
01645   /* Make sure there's a valid database */
01646   if (!commdb)
01647     return NULL;
01648 
01649   if (db_Solve(commdb, info, ICL_EMPTY, &answers)) {
01650     ICLTerm *item = icl_NthTerm(answers, 1);
01651     result = strdup(icl_Str(icl_NthTerm(item, 1)));
01652   }
01653   icl_Free(answers);
01654   return result;
01655 }
01656 
01662 EXPORT_MSCPP
01663 void EXPORT_BORLAND
01664 cnx_CanonicalAddress(ICLTerm *address, SOCKET socket, ICLTerm **result)
01665 {
01666   ICLTerm *addressStruct = icl_NewTermFromData("tcp(Host,Port)",14);
01667 
01668   if (icl_Unify(addressStruct, address, NULL) && (socket>0)) {
01669     struct sockaddr_in* ptr_distaddr;
01670     struct sockaddr distaddr;
01671     int remotePortNumber = -1;
01672     char remoteHostName[255];
01673 
01674 #ifdef _WINDOWS
01675     int addrlen = sizeof(distaddr);
01676 #else
01677     socklen_t addrlen = sizeof(distaddr);
01678 #endif
01679 
01680     sprintf(remoteHostName, "unknown");
01681     ptr_distaddr = (struct sockaddr_in *)&(distaddr);
01682 
01683     if (getpeername(socket, &distaddr, &addrlen)==0) {
01684 
01685       /* HACK
01686          ptr_distaddr->sin_port does not work under windows !
01687          #ifndef _WINDOWS
01688          remotePortNumber = ptr_distaddr->sin_port;
01689          #else
01690          remotePortNumber = icl_Int(icl_NthTerm(address, 2));
01691          #endif
01692       */
01693       remotePortNumber = icl_Int(icl_NthTerm(address, 2));
01694 
01695       /* To get the host name */
01696       /*
01697         disthost=gethostbyaddr((const void*)(&ptr_distaddr->sin_addr), 4, AF_INET);
01698         if (disthost)
01699         sprintf(remoteHostName,"%s",disthost->h_name);
01700       */
01701       /* To get the IP number */
01702       sprintf(remoteHostName,"%s",inet_ntoa(ptr_distaddr->sin_addr));
01703 
01704 
01705       if (result!=NULL)
01706         *result = icl_NewStruct("addr", 1,
01707                                 icl_NewStruct("tcp", 2,
01708                                               icl_NewStr(remoteHostName),
01709                                               icl_NewInt(remotePortNumber)));
01710 
01711     }
01712     else {
01713       comPrintError("%s",
01714                     "cnx_CanonicalAddress() can only be called on CONNECTED socket!");
01715     }
01716   }
01717   icl_Free(addressStruct);
01718 }
01719 
01720 /* -------------------------------------------------------------------------- */
01721 
01722 // new functions added to support direct_connect
01723 
01724 static void checkEventBuffer() {
01725   if (eventBuffer == NULL) {
01726     eventBuffer = icl_NewList(NULL);
01727   }
01728 } // end of checkEventBuffer(
01729 
01730 static void com_accept_new_connection(int listenersocket) {
01731   int connection = -1;
01732   char* namestring = NULL;
01733   ICLTerm *readerInfo = NULL;
01734   ICLTerm *senderInfo = NULL;
01735   ICLTerm *ev_conn = NULL;
01736 
01737   connection = accept(listenersocket, NULL, NULL);
01738 
01739   if (connection > 0) {
01740     ICLTerm *canonicalOtherAddress = NULL;
01741     ICLTerm *newConnectionInfo = NULL;
01742     ICLTerm *tempList = icl_NewList(NULL);
01743     char *ConnectionId = new_direct_client_id();
01744     ICLTerm *templateTcpAddr = icl_NewTermFromData("tcp(Host,Port)",14);
01745     char buf[1000];
01746 
01747     memset(buf, 0, sizeof(buf));
01748     cnx_CanonicalAddress(templateTcpAddr, connection, &canonicalOtherAddress);
01749 
01750     icl_AddToList(tempList, icl_NewStruct("other_address", 1, canonicalOtherAddress), FALSE);
01751     icl_AddToList(tempList, icl_NewStruct("connection", 1, icl_NewInt(connection)),FALSE);
01752 
01753     {
01754       TermReader* realTr = NULL;
01755       TermSender* realTs = NULL;
01756       char* readerPtr;
01757       char* readerInfoBuf;
01758       char* senderPtr;
01759       char* senderInfoBuf;
01760 
01761       if (com_is_binary_listener(listenersocket)) {
01762         BinaryTermReader* btr;
01763         BinaryTermSender* bts;
01764         realTr = termReader_create();
01765         btr = binaryTermReader_create(realTr, connection);
01766         realTs = termSender_create();
01767         bts = binaryTermSender_create(realTs, connection);
01768       }
01769       else {
01770         StringTermReader* str;
01771         StringTermSender* sts;
01772         realTr = termReader_create();
01773         str = stringTermReader_create(realTr, connection);
01774         realTs = termSender_create();
01775         sts = stringTermSender_create(realTs, connection);
01776       }
01777 
01778       readerPtr = pointerToString(realTr);
01779 #ifdef NORMAL_GC
01780       /* so we don't lose the pointer */
01781       /*
01782       if(termReaders == NULL) {
01783         termReaders = g_ptr_array_new();
01784       }
01785       g_ptr_array_add(termReaders, realTr);
01786       */
01787       termReaders[++termReadersIdx] = realTr;
01788 #endif
01789       readerInfoBuf = (char*)malloc(strlen(readerPtr) + 10 + 1);
01790       strncpy(readerInfoBuf, "reader('", 8);
01791       strncpy(readerInfoBuf + 8, readerPtr, strlen(readerPtr));
01792       readerInfoBuf[strlen(readerPtr) + 8] = '\'';
01793       readerInfoBuf[strlen(readerPtr) + 9] = ')';
01794       readerInfoBuf[strlen(readerPtr) + 10] = '\0';
01795       readerInfo = icl_NewTermFromString(readerInfoBuf);
01796       free(readerPtr);
01797       free(readerInfoBuf);
01798       senderPtr = pointerToString(realTs);
01799 #ifdef NORMAL_GC
01800       /* so we don't lose the pointer */
01801       /*
01802       if(termSenders == NULL) {
01803         termSenders = g_ptr_array_new();
01804       }
01805       g_ptr_array_add(termSenders, realTs);
01806       */
01807       termSenders[++termSendersIdx] = realTs;
01808 #endif
01809       senderInfoBuf = (char*)malloc(strlen(senderPtr) + 10 + 1);
01810       strncpy(senderInfoBuf, "sender('", 8);
01811       strncpy(senderInfoBuf + 8, senderPtr, strlen(senderPtr));
01812       senderInfoBuf[strlen(senderPtr) + 8] = '\'';
01813       senderInfoBuf[strlen(senderPtr) + 9] = ')';
01814       senderInfoBuf[strlen(senderPtr) + 10] = '\0';
01815       senderInfo = icl_NewTermFromString(senderInfoBuf);
01816       free(senderPtr);
01817       free(senderInfoBuf);
01818     }
01819 
01820     icl_AddToList(tempList, readerInfo, FALSE);
01821     icl_AddToList(tempList, senderInfo, FALSE);
01822 
01823     newConnectionInfo = icl_NewStruct("com_connection_info", 5,
01824                                       icl_NewStr(ConnectionId),
01825                                       icl_NewStr("tcp"),
01826                                       icl_NewStr("client"),
01827                                       tempList,
01828                                       icl_NewStr("connected"));
01829 
01830     if (newConnectionInfo) {
01831      db_Assert(commdb, newConnectionInfo, ICL_EMPTY);
01832      icl_Free(newConnectionInfo);
01833     }
01834     else {
01835      printf("Error! %s:%i com_connection_info could not be constructed.\n", __FILE__, __LINE__);
01836     }
01837 
01838     {
01839     ICLTerm *ConnEvent = NULL;
01840     ICLTerm *EventTerm = NULL;
01841     ICLTerm *t1   = NULL;
01842     // Get the connection acknowledgement from the same ConnectionId.
01843     // A timeout of 0.0 means we'll block on this socket.
01844     com_select_event_from_socket(connection, 0.0, &EventTerm);
01845 
01846     ConnEvent = icl_NthTerm(icl_NthTerm(EventTerm, 1), 1);
01847 
01848     if (!icl_Unify(ConnEvent,
01849                   (t1 = icl_NewStruct("ev_connect",
01850                                       1,
01851                                       icl_NewVar("_"))),
01852                    NULL)) {
01853       fprintf(stderr, "Warning in com_accept_new_connection: ");
01854       fprintf(stderr, "Expected ev_connect, got %s\n", icl_Functor(ConnEvent));
01855       fflush(stderr);
01856     }
01857     icl_Free(t1);
01858     icl_Free(EventTerm);
01859    }
01860 
01861     namestring = oaa_name_string();
01862     sprintf(buf, "event(ev_connected([other_name('%s'), "
01863                  "other_language(c), other_type(client), "
01864                  "other_version(%s), ascii(%d), binary(%d)]),[])",
01865                  namestring, oaa_library_version_str,
01866                  stringPort, binaryPort);
01867     icl_stFree(namestring);
01868 
01869     ev_conn = icl_NewTermFromString(buf);
01870     com_SendTerm(ConnectionId, ev_conn);
01871     icl_Free(ev_conn);
01872     icl_Free(templateTcpAddr);
01873     free(ConnectionId);
01874   }
01875 
01876 } // end of com_accept_new_connection
01877 
01878 static int com_add_event_to_buffer(ICLTerm *event) {
01879   int answer = FALSE;
01880   checkEventBuffer();
01881   // add to end of list
01882   answer = icl_AddToList(eventBuffer, event, TRUE);
01883   return answer;
01884 } //end of com_add_event_to_buffer
01885 
01886 static int com_disconnect_all_connection_ids() {
01887   int answer = TRUE;
01888   ICLTerm *cids = NULL;
01889   ICLListType *cidslist = NULL;
01890   com_find_all_connection_ids(&cids);
01891   cidslist = icl_List(cids);
01892   while (icl_ListHasMoreElements(cidslist)) {
01893     char *cid = icl_Str(icl_ListElement(cidslist));
01894     if (cid != NULL) {
01895       answer = com_Disconnect(cid) && answer;
01896     }
01897     cidslist = icl_ListNextElement(cidslist);
01898   }
01899   icl_Free(cids);
01900   return answer;
01901 } // end of com_disconnect_from_all_connection_ids
01902 
01903 
01904 static int com_find_all_connected_sockets(ICLTerm *ids, ICLTerm **sockets) {
01905   int numberOfSockets = 0;
01906   ICLListType *idlist = icl_List(ids);
01907   *sockets = icl_NewList(NULL);
01908   while(icl_ListHasMoreElements(idlist)) {
01909     char *id = icl_Str(icl_ListElement(idlist));
01910     int socket = com_get_socket_from_connection_id(id);
01911     if (socket > 0) {
01912       icl_AddToList(*sockets, icl_NewInt(socket), TRUE);
01913       numberOfSockets++;
01914     }
01915     idlist = icl_ListNextElement(idlist);
01916   }
01917   return numberOfSockets;
01918 } // end of com_find_all_connected_sockets
01919 
01920 
01921 static int com_find_all_connection_ids(ICLTerm **ids) {
01922   int answer = FALSE;
01923   ICLTerm *result = NULL;
01924   ICLListType *resultlist;
01925   static ICLTerm *info = NULL;
01926 
01927   if (!icl_IsValid(info)) {
01928     info = icl_NewTermFromData(
01929       "com_connection_info(ConnectionId,Protocol,Type,InfoList,Status)",63);
01930   }
01931 
01932   if ( (answer = db_Solve(commdb, info, ICL_EMPTY, &result)) ) {
01933     resultlist = icl_List(result);
01934     *ids = icl_NewList(NULL);
01935     while(icl_ListHasMoreElements(resultlist)) {
01936       ICLTerm *item = icl_ListElement(resultlist);
01937       ICLTerm *id = icl_NthTerm(item,1);
01938       icl_AddToList(*ids, icl_CopyTerm(id), FALSE);
01939       resultlist = icl_ListNextElement(resultlist);
01940     }
01941   }
01942   icl_Free(result);
01943   return answer;
01944 } // end of com_find_all_connection_ids
01945 
01946 static char* com_get_connection_id_from_socket(int socket) {
01947   char *answer = NULL;
01948   ICLTerm *cids = NULL;
01949   ICLListType *cidslist = NULL;
01950   com_find_all_connection_ids(&cids);
01951   cidslist = icl_List(cids);
01952   while (icl_ListHasMoreElements(cidslist)) {
01953     char *cid = icl_Str(icl_ListElement(cidslist));
01954     if (socket == com_get_socket_from_connection_id(cid)) {
01955       answer = strdup(cid);
01956       break;
01957     }
01958     cidslist = icl_ListNextElement(cidslist);
01959   }
01960   icl_Free(cids);
01961   return answer;
01962 } // end of com_get_connection_id_from_socket
01963 
01967 EXPORT_MSCPP
01968 int EXPORT_BORLAND
01969 com_GetEventFromConnection(char *ConnectionId, double timeout, ICLTerm **event) {
01970   int socketNumber = com_get_socket_from_connection_id(ConnectionId);
01971   return com_select_event_from_socket(socketNumber, timeout, event);
01972 } // end of com_GetEventFromConnection
01973 
01981 static char *com_get_localhost_ip_address(void) {
01982     struct hostent *h = NULL;
01983     char name[MAXHOSTNAMELEN + 1];
01984 
01985     if ( (gethostname(name, sizeof(name)) != 0) ||
01986          ((h = gethostbyname(name)) == NULL) ) {
01987       fprintf(stderr,"Warning: could not determine address for localhost\n");
01988       return NULL;
01989     }
01990    return strdup(inet_ntoa(*((struct in_addr *) *(h->h_addr_list))));
01991 }
01992 
01993 static int com_is_binary_listener(int socket) {
01994   return (socket == binaryListener);
01995 } // end of com_is_binary_listener
01996 
01997 static int com_is_listening() {
01998   return stringListener > 0;
01999 } // end of com_is_listening
02000 
02001 
02002 
02009 static int com_read_events_into_buffer(double timeout) {
02010   int numberOfEventsAdded = 0;
02011   ICLTerm *ids = NULL;
02012   ICLTerm *connectedSockets = NULL;
02013   ICLListType *connectedSocketList = NULL;
02014 
02015   struct timeval  time;
02016   struct timeval* timep = NULL;
02017   int width = 0;
02018   fd_set read_set;
02019 
02020   // timeout = 0.0 means select should block
02021   if(timeout == 0 || timeout > 1) {
02022     time.tv_sec = 1;
02023     time.tv_usec = 0;
02024     timep = &time;
02025   }
02026   else if(timeout > 0) {
02027     long i = (long)floor(timeout);
02028     time.tv_sec = i;
02029     time.tv_usec = (long)floor((timeout - i) * 1e6);
02030     timep = &time;
02031   }
02032 
02033   FD_ZERO(&read_set);
02034 
02035   // if we have listener sockets, add them to the fd_set
02036   if (com_is_listening()) {
02037     FD_SET(stringListener, &read_set);
02038     FD_SET(binaryListener, &read_set);
02039   }
02040 
02041   com_find_all_connection_ids(&ids);
02042   com_find_all_connected_sockets(ids, &connectedSockets);
02043   connectedSocketList = icl_List(connectedSockets);
02044 
02045   // add connected sockets to fd_set
02046   while(icl_ListHasMoreElements(connectedSocketList)) {
02047     int socket = icl_Int(icl_ListElement(connectedSocketList));
02048     FD_SET(socket, &read_set);
02049     width = MAX(width, socket+1);
02050     connectedSocketList = icl_ListNextElement(connectedSocketList);
02051   }
02052 
02053   if (select(width, &read_set, NULL, NULL, timep) <= 0) {
02054     // nothing to do...clean up and return
02055     icl_Free(ids);
02056     icl_Free(connectedSockets);
02057     return 0;
02058   }
02059 
02060   if (com_is_listening()) {
02061     // handle requests to connect, if any
02062     if (FD_ISSET(stringListener, &read_set)) {
02063       com_accept_new_connection(stringListener);
02064       FD_CLR(stringListener, &read_set);
02065     }
02066 
02067     if (FD_ISSET(binaryListener, &read_set)) {
02068       com_accept_new_connection(binaryListener);
02069       FD_CLR(stringListener, &read_set);
02070     }
02071   }
02072 
02073   // check connected sockets
02074   connectedSocketList = icl_List(connectedSockets);
02075   while(icl_ListHasMoreElements(connectedSocketList)) {
02076     int socket = icl_Int(icl_ListElement(connectedSocketList));
02077 
02078     // read incoming events to the eventbuffer
02079     if (FD_ISSET(socket, &read_set)) {
02080       ICLTerm *event = NULL;
02081       double totalTime = 0;
02082       while(((timeout == 0) || (totalTime < timeout)) &&
02083             com_select_event_from_socket(socket, 0.01, &event)) {
02084         com_add_event_to_buffer(event);
02085         ++numberOfEventsAdded;
02086         totalTime += 0.01;
02087         if(icl_IsStruct(event) &&
02088            (icl_NumTerms(event) == 1) &&
02089            (strcmp(icl_Functor(event), "event") == 0)) {
02090           ICLTerm* firstArg = icl_NthTerm(event, 1);
02091           if(icl_IsStr(firstArg) &&
02092              (strcmp(icl_Str(firstArg), "timeout") == 0)) {
02093             break;
02094           }
02095         }
02096       }
02097     }
02098     connectedSocketList = icl_ListNextElement(connectedSocketList);
02099   }
02100 
02101   icl_Free(ids);
02102   icl_Free(connectedSockets);
02103   return numberOfEventsAdded;
02104 } // end of com_read_events_into_buffer
02105 
02106 static int com_select_event_from_buffer(ICLTerm **event) {
02107   ICLListType *eventList = NULL;
02108 
02109   // bail quickly if buffer is empty
02110   checkEventBuffer();
02111   if (icl_ListLen(eventBuffer) == 0 ) {
02112     *event = NULL;
02113     return FALSE;
02114   }
02115 
02116   // select and remove from the head of the list...
02117   eventList = icl_List(eventBuffer);
02118   *event = eventList->elt;
02119   eventBuffer->p = eventList->next;
02120 #ifdef NORMAL_GC
02121   memset(eventList, 0, sizeof(ICLListType));
02122 #endif
02123   free(eventList);
02124 
02125   return TRUE;
02126 } // end of com_select_event_from_buffer
02127 
02132 static int com_select_event_from_socket(int socketNumber, double timeout, ICLTerm **event){
02133   ICLTerm* readerTerm;
02134   ICLTerm* query;
02135   TermReader* reader;
02136   char* readerString;
02137   char *inConnectionId;
02138   inConnectionId = com_get_connection_id_from_socket(socketNumber);
02139   query = icl_NewStruct("reader", 1, icl_NewVar("_"));
02140   if(!com_GetInfo(inConnectionId, query, &readerTerm)) {
02141     icl_Free(query);
02142     fprintf(stderr, "com_select_event_from_socket could not find reader\n");
02143     icl_stFree(inConnectionId);
02144     return FALSE;
02145   }
02146   readerString = icl_Str(icl_NthTerm(readerTerm, 1));
02147   readerString = strdup(readerString);
02148   icl_stRemoveQuotes(readerString);
02149   reader = (TermReader*)stringToPointer(readerString);
02150   free(readerString);
02151   CHECK_LEAKS();
02152   *event = termReader_getNextTerm(reader, timeout);
02153   CHECK_LEAKS();
02154   icl_Free(query);
02155   icl_Free(readerTerm);
02156   if(*event == NULL) {
02157     int te = termReader_getError(reader);
02158     // print an error message if it is not an "ordinary" error
02159     if ( (te != TERMREADER_OKAY) &&
02160          (te != TERMREADER_TIMEOUT) &&
02161          (te != TERMREADER_READERR) &&
02162          (te != TERMREADER_NOCONN) ) {
02163       fprintf(stderr,
02164         "com_select_event_from_socket error in reading term: %i\n", te);
02165     }
02166     // disconnect if it was not a timeout error
02167     if ( (te != TERMREADER_OKAY) &&
02168          (te != TERMREADER_TIMEOUT) ) {
02169       com_Disconnect(inConnectionId);
02170 //      fprintf(stderr, "disconnected connection_id: %s socket: %d\n",
02171 //              inConnectionId, socketNumber);fflush(stderr);
02172     }
02173     icl_stFree(inConnectionId);
02174     return FALSE;
02175   }
02176   else {
02177     // if present, copy connection id into params (for direct_connect)
02178     if (!strcmp(icl_Functor(*event),"term")) {
02179       ICLTerm *CID = icl_NewStruct("connection_id",1,icl_NewStr(inConnectionId));
02180       if (icl_NumTerms(*event) == 1) {
02181         icl_AddToList(icl_NthTerm(icl_NthTerm(*event,1),2), CID, TRUE);
02182       }
02183       else if (icl_NumTerms(*event) == 2) {
02184         icl_AddToList(icl_NthTerm(icl_NthTerm(*event,2),2), CID, TRUE);
02185       }
02186     }
02187     icl_stFree(inConnectionId);
02188     return TRUE;
02189   }
02190 } // end of com_select_event_from_socket
02191 
02192 static ICLTerm* initiatedTerm = NULL;
02196 static void sendHeartbeats()
02197 {
02198   ICLTerm* initiatedConnections = NULL;
02199   ICLListType* listElem;
02200 
02201   if(!initiatedTerm) {
02202     initiatedTerm = icl_NewStruct("initiated", 1, icl_NewStr("true"));
02203   }
02204 
02205   com_getMatchingInfos(initiatedTerm, &initiatedConnections);
02206 
02207   for(listElem = icl_List(initiatedConnections); listElem; listElem = icl_ListNext(listElem)) {
02208     char* connectionId = icl_Str(icl_NthTerm(icl_ListElt(listElem), 1));
02209     if(oaa_SupportsSequenceNumbers(connectionId)) {
02210       ICLTerm* heartbeatEvent = icl_NewStruct("event", 2, icl_NewStr("ev_heartbeat"), icl_NewList(NULL));
02211       com_SendTerm(connectionId, heartbeatEvent);
02212       icl_Free(heartbeatEvent);
02213     }
02214   }
02215 
02216   icl_Free(initiatedConnections);
02217 }
02218 
02222 static void checkForFailedHeartbeats()
02223 {
02224   ICLTerm* initiatedConnections = NULL;
02225   ICLListType* listElem;
02226   GTimeVal now;
02227   glong nowSec;
02228 
02229   g_get_current_time(&now);
02230   nowSec = now.tv_sec;
02231 
02232   com_getMatchingInfos(initiatedTerm, &initiatedConnections);
02233 
02234   for(listElem = icl_List(initiatedConnections); listElem; listElem = icl_ListNext(listElem)) {
02235     ICLTerm* lastHeartbeat = NULL;
02236     ICLTerm* infoList = icl_NthTerm(icl_ListElt(listElem), 4);
02237     if(!icl_ParamValue("last_heartbeat", NULL, infoList, &lastHeartbeat)) {
02238       continue;
02239     }
02240     else {
02241       char* connectionId = icl_Str(icl_NthTerm(icl_ListElt(listElem), 1));
02242       glong prevHeartbeatSec = icl_Int(icl_NthTerm(lastHeartbeat, 1));
02243       if(nowSec - prevHeartbeatSec > HEARTBEAT_TIMEOUT) {
02244         ICLTerm* socketTerm;
02245         int socket;
02246         if(!icl_ParamValue("connection", NULL, infoList, &socketTerm)) {
02247           fprintf(stderr, "Unable to determine socket for failed connection %s\n", connectionId);
02248           return;
02249         }
02250         if(!(socket = icl_Int(icl_NthTerm(socketTerm, 1)))) {
02251           char* socketTermString = icl_NewStringFromTerm(socketTerm);
02252           fprintf(stderr, "Unable to determine socket from socket term %s\n", socketTermString);
02253           icl_stFree(socketTermString);
02254           return;
02255         }
02256         close_socket(socket);
02257         {
02258           ICLTerm* senderQuery = icl_NewStruct("sender", 1, icl_NewVar("_"));
02259           ICLTerm* senderTerm = NULL;
02260           if(!com_GetInfo(connectionId, senderQuery, &senderTerm)) {
02261             fprintf(stderr, "Unable to find sender pointer for connection %s\n", connectionId);
02262           }
02263           if(senderTerm) {
02264             char *pointerStr = strdup(icl_Str(icl_NthTerm(senderTerm, 1)));
02265             TermSender* ts;
02266             icl_stRemoveQuotes(pointerStr);
02267             ts = (TermSender*)stringToPointer(pointerStr);
02268             free(ts);
02269             free(pointerStr);
02270             com_RemoveInfo(connectionId, senderQuery);
02271           }
02272           icl_Free(senderQuery);
02273         }
02274         {
02275           ICLTerm* readerQuery = icl_NewStruct("reader", 1, icl_NewVar("_"));
02276           ICLTerm* readerTerm = NULL;
02277           if(!com_GetInfo(connectionId, readerQuery, &readerTerm)) {
02278             fprintf(stderr, "Unable to find reader pointer for connection %s\n", connectionId);
02279           }
02280           if(readerTerm) {
02281             char *pointerStr = strdup(icl_Str(icl_NthTerm(readerTerm, 1)));
02282             TermSender* ts;
02283             icl_stRemoveQuotes(pointerStr);
02284             ts = (TermSender*)stringToPointer(pointerStr);
02285             free(ts);
02286             free(pointerStr);
02287             com_RemoveInfo(connectionId, readerQuery);
02288           }
02289           icl_Free(readerQuery);
02290         }
02291         com_RemoveInfo(connectionId, lastHeartbeat);
02292         {
02293           ICLTerm* reconnectTerm = icl_NewStruct("needs_reconnect", 1, icl_NewStr("true"));
02294           com_UpdateInfo(connectionId, reconnectTerm);
02295         }
02296         printf("Failed connection %s\n", connectionId);
02297       }
02298       icl_Free(lastHeartbeat);
02299     }
02300   }
02301   if(initiatedConnections) icl_Free(initiatedConnections);
02302 }
02303 
02304 static void tryReconnects()
02305 {
02306   ICLTerm* reconnectConnections = NULL;
02307   //ICLListType* listElem;
02308   ICLTerm* reconnectTerm = icl_NewStruct("needs_reconnect", 1, icl_NewStr("true"));
02309   ICLTerm* otherAddressMatcher = icl_NewStruct("other_address", 1, icl_NewVar("_"));
02310   ICLTerm* myAddressMatcher = icl_NewStruct("oaa_address", 1, icl_NewVar("_"));
02311   ICLTerm* otherSequenceMatcher = icl_NewStruct("other_sequence", 1, icl_NewVar("_"));
02312   ICLListType* listElem;
02313   ICLTerm* myNameMatcher = icl_NewStruct("oaa_name", 1, icl_NewVar("_"));
02314 
02315   com_getMatchingInfos(reconnectTerm, &reconnectConnections);
02316   for(listElem = icl_List(reconnectConnections); listElem; listElem = icl_ListNext(listElem)) {
02317     char* connId = icl_Str(icl_NthTerm(icl_ListElt(listElem), 1));
02318     ICLTerm* otherAddress;
02319     ICLTerm* connectAddress;
02320     ICLTerm* myAddress;
02321     ICLTerm* otherSequence;
02322     ICLTerm* myName;
02323     int lastSeen;
02324 
02325     com_RemoveInfo(connId, reconnectTerm);
02326     if(!icl_Member(otherAddressMatcher, icl_NthTerm(icl_ListElt(listElem), 4), &otherAddress)) {
02327       fprintf(stderr, "reconnect required for connection %s, but no other_address found\n", connId);
02328       goto cleanup;
02329     }
02330     connectAddress = icl_CopyTerm(icl_NthTerm(icl_NthTerm(otherAddress, 1), 1));
02331     if(!com_GetInfo(connId, myAddressMatcher, &myAddress)) {
02332       fprintf(stderr, "reconnect required for connection %s, but no oaa_address found\n", connId);
02333       goto cleanup;
02334     }
02335     if(!com_GetInfo(connId, myNameMatcher, &myName)) {
02336       fprintf(stderr, "reconnect required for connection %s, but no oaa_name found\n", connId);
02337       goto cleanup;
02338     }
02339     if(!com_GetInfo(connId, otherSequenceMatcher, &otherSequence)) {
02340       lastSeen = -1;
02341     }
02342     else {
02343       lastSeen = icl_Int(icl_NthTerm(otherSequence, 1));
02344       icl_Free(otherSequence);
02345     }
02346     if(com_Connect(connId, connectAddress)) {
02347       com_add_event_to_buffer(
02348         icl_NewStruct("term", 1, 
02349           icl_NewStruct("event", 2, 
02350             icl_NewStruct("ev_reconnected_needs_handshake", 5,
02351               icl_NewStr(connId),
02352               icl_NewStr(icl_Str(icl_NthTerm(myName, 1))),
02353               icl_NewStruct("reconnect", 1, icl_NewInt(lastSeen)),
02354               icl_NewStruct("other_address", 1, icl_CopyTerm(icl_NthTerm(myAddress, 1))),
02355               icl_NewStruct("other_name", 1, icl_NewStr(icl_Str(icl_NthTerm(myName, 1))))
02356               ),
02357             icl_NewList(NULL))));
02358     }
02359 
02360     icl_Free(connectAddress);
02361   }
02362 
02363 cleanup:
02364   icl_Free(reconnectConnections);
02365   icl_Free(otherAddressMatcher);
02366   icl_Free(myAddressMatcher);
02367   icl_Free(otherSequenceMatcher);
02368   icl_Free(myNameMatcher);
02369   icl_Free(reconnectTerm);
02370 }
02371 
02372 int initializedLastFired = FALSE;
02373 static GTimeVal lastFired;
02374 
02378 static void fireTimers()
02379 {
02380   if(!initializedLastFired) {
02381     g_get_current_time(&lastFired);
02382     initializedLastFired = TRUE;
02383   }
02384   else {
02385     GTimeVal now;
02386     guint64 udiff;
02387     g_get_current_time(&now);
02388     udiff = (now.tv_sec - lastFired.tv_sec) * 1000000 + (now.tv_usec - lastFired.tv_usec);
02389     if(udiff > 1000000) {
02390       lastFired.tv_sec = now.tv_sec;
02391       lastFired.tv_usec = now.tv_usec;
02392     }
02393     else {
02394       return;
02395     }
02396   }
02397 
02398   sendHeartbeats();
02399   checkForFailedHeartbeats();
02400   tryReconnects();
02401 }
02402 
02412 EXPORT_MSCPP
02413 int EXPORT_BORLAND
02414 com_SelectEventFromAllIds(double timeout, ICLTerm **event) {
02415   int answer = FALSE;
02416   checkEventBuffer();
02417 
02418   if (com_select_event_from_buffer(event)) {
02419     CHECK_LEAKS();
02420     answer = TRUE;
02421   }
02422   else if (com_read_events_into_buffer(timeout)) {
02423     CHECK_LEAKS();
02424     answer = com_select_event_from_buffer(event);
02425     CHECK_LEAKS();
02426   }
02427   else {
02428     answer = FALSE;
02429   }
02430   fireTimers();
02431   CHECK_LEAKS();
02432   return answer;
02433 } // end of com_SelectEventFromAllIds
02434 
02438 static char* new_direct_client_id() {
02439     static int counter = 1;
02440     char temp[255];
02441     sprintf(temp, "direct_client%d", counter++);
02442     return strdup(temp);
02443 } // end of new_direct_client_id
02444 
02445 
02451 static int opentcpport(int port) {
02452    int socketFd; /* File descriptor for the socket */
02453    struct sockaddr_in addr; /* Address to receive data from */
02454 
02455    // Open the socket
02456    socketFd = socket(AF_INET, SOCK_STREAM, 0);
02457    if (socketFd < 0) {
02458       printf("opentcpport: socket open failure: %m\n");
02459       return -1;
02460    }
02461 
02462    // Clear the address
02463    memset(&addr, 0, sizeof(addr));
02464 
02465    // Set the address family to INET
02466    addr.sin_family = AF_INET;
02467 
02468    // Set the address mask to any address using network byte order
02469    addr.sin_addr.s_addr = htonl(INADDR_ANY);
02470 
02471    // Set the port using network byte order
02472    addr.sin_port = htons(port);
02473 
02474    // Bind the port to the socket
02475    if (bind(socketFd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
02476       printf("opentcpport: bind failure on port %d: %m\n", port);
02477       return -1;
02478    }
02479 
02480    // Return the file descriptor
02481    return socketFd;
02482 } // end of opentcpport
02483 
02487 static int tcpListenAtPort(int port) {
02488   char listenAtString[600];
02489   ICLTerm *listenAtTerm;
02490   int socketFd;
02491 
02492   // Make sure database is initialized
02493   if (!commdb)
02494     commdb = db_NewDB();
02495 
02496     socketFd = opentcpport(port);
02497 
02498     if (listen(socketFd, 5) < 0) // backlog = 5
02499     {
02500         printf("listen() error: %m\n");
02501     }
02502 
02503     if (socketFd >= 0) {
02504         sprintf(listenAtString, "tcp_listener(%d)", socketFd);
02505     }
02506     listenAtTerm = icl_NewTermFromString(listenAtString);
02507 
02508     if (listenAtTerm) {
02509         db_Assert(commdb, listenAtTerm, ICL_EMPTY);
02510         icl_Free(listenAtTerm);
02511     }
02512     else {
02513         printf("Error! tcp_listener could not be constructed.\n");
02514     }
02515 
02516     return socketFd;
02517 } // end of tcpListenAtPort
02518 
02519 

Generated on Wed May 23 17:20:11 2007 using doxygen 1.5.2