00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifdef _WINDOWS
00022 #include <windows.h>
00023 #include <winsock.h>
00024 #include "oaa-windows.h"
00025 #else
00026 #include <sys/types.h>
00027 #include <sys/socket.h>
00028 #include <unistd.h>
00029 #include <sys/time.h>
00030 #endif
00031
00032 #include "stdpccts.h"
00033
00034 #include <string.h>
00035 #include <stdlib.h>
00036 #include <math.h>
00037 #include <stdio.h>
00038 #include "liboaa.h"
00039
00040 #include "stringtermreader.h"
00041 #include "libicl_private.h"
00042 #include "stringbuffer.h"
00043 #include "stringbuffer_private.h"
00044
00045 #ifdef _WINDOWS
00046
00047
00048
00049 enum { INITBUFFERSZ = 8388608 };
00050 enum { AMOUNTTOREAD = 8192 };
00051 #else
00052 static const size_t INITBUFFERSZ = 8;
00053 static const size_t AMOUNTTOREAD = 8192;
00054 #endif
00055
00056 struct StringTermReaderStruct
00057 {
00058 TermReader* superReader;
00059 char* currentBuffer;
00060 size_t bufCapacity;
00061 size_t bufUsed;
00062 }
00063 ;
00064
00065 void stringTermReader_cleanup(TermReader*);
00066 ICLTerm* stringTermReader_getNext(TermReader*, double);
00067 void stringTermReader_fillData(StringTermReader* sr, double timeout);
00068 void stringTermReader_addToCurrentBuffer(StringTermReader* sr, char* buf, ssize_t len);
00069
00070 StringTermReader* stringTermReader_create(TermReader* t, gint listenSocket)
00071 {
00072 StringTermReader* r = (StringTermReader*)malloc(sizeof(StringTermReader));
00073 r->superReader = t;
00074 r->currentBuffer = (char*)malloc(INITBUFFERSZ);
00075 r->bufUsed = 0;
00076 r->bufCapacity = INITBUFFERSZ;
00077 termReader_setReaderSpecificData(t, r);
00078 termReader_setSocket(t, listenSocket);
00079 termReader_setType(t, STRINGTERMREADERTYPE);
00080 termReader_setGetNextCallback(t, stringTermReader_getNext);
00081 termReader_setCleanupCallback(t, stringTermReader_cleanup);
00082 termReader_setError(t, TERMREADER_OKAY);
00083 return r;
00084 }
00085
00086 void stringTermReader_cleanup(TermReader* reader)
00087 {
00088 StringTermReader* sr = (StringTermReader*)termReader_getReaderSpecificData(reader);
00089 if(sr->currentBuffer != NULL) {
00090 free(sr->currentBuffer);
00091 }
00092 if(sr != NULL) {
00093 free(sr);
00094 }
00095 }
00096
00097 ICLTerm* stringTermReader_getNext(TermReader* reader, double timeout)
00098 {
00099 ICLTerm* nextEvent = NULL;
00100 stringbuffer_t sbuf;
00101 gboolean done = FALSE;
00102 gboolean timedOut = FALSE;
00103 gboolean hadOldData = FALSE;
00104 gboolean forceFill = FALSE;
00105 StringTermReader* sr = (StringTermReader*)termReader_getReaderSpecificData(reader);
00106
00107 CHECK_LEAKS();
00108
00109
00110
00111 while(!done) {
00112 if((sr->bufUsed == 0) || forceFill) {
00113
00114
00115
00116 stringTermReader_fillData(sr, timeout);
00117 CHECK_LEAKS();
00118 if(termReader_getError(reader) == TERMREADER_TIMEOUT) {
00119
00120
00121
00122 timedOut = TRUE;
00123 }
00124 else if(termReader_getError(reader) != TERMREADER_OKAY) {
00125 return NULL;
00126 }
00127 }
00128 else {
00129
00130
00131
00132 forceFill = FALSE;
00133 hadOldData = TRUE;
00134 }
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162 sleep_millis(0);
00163
00164 sbuf.data = sr->currentBuffer;
00165 sbuf.len = sr->bufUsed;
00166 sbuf.index = 0;
00167 CHECK_LEAKS();
00168 if((sr->bufUsed > 0) &&
00169 (parser_getNetTermFromBuf(&nextEvent, &sbuf) != FALSE)) {
00170 size_t newUsed;
00171 CHECK_LEAKS();
00172 newUsed = sr->bufUsed - stringbuffer_getIndex(&sbuf);
00173 sr->currentBuffer = memmove(sr->currentBuffer,
00174 sr->currentBuffer + stringbuffer_getIndex(&sbuf),
00175 newUsed);
00176 sr->bufUsed = newUsed;
00177
00178 done = TRUE;
00179
00180
00181
00182
00183
00184
00185
00186 parser_setDebug(FALSE);
00187 CHECK_LEAKS();
00188 return nextEvent;
00189 }
00190 CHECK_LEAKS();
00191 parser_setDebug(FALSE);
00192
00193 if(timedOut) {
00194 CHECK_LEAKS();
00195 nextEvent = icl_NewTermFromData("event(timeout)", 14);
00196 CHECK_LEAKS();
00197
00198
00199
00200 return nextEvent;
00201 }
00202 else if(termReader_getError(reader) != TERMREADER_OKAY) {
00203
00204
00205
00206 CHECK_LEAKS();
00207 return NULL;
00208 }
00209 else if(hadOldData) {
00210
00211
00212
00213 forceFill = TRUE;
00214 hadOldData = FALSE;
00215 CHECK_LEAKS();
00216 continue;
00217 }
00218 else {
00219
00220
00221
00222 CHECK_LEAKS();
00223 continue;
00224 }
00225 }
00226
00227
00228
00229 CHECK_LEAKS();
00230 return NULL;
00231 }
00232
00233 void stringTermReader_fillData(StringTermReader* sr, double timeout)
00234 {
00235 char buf[AMOUNTTOREAD + 1];
00236 struct timeval time;
00237 struct timeval* timep = NULL;
00238 int lastFdPlusOne;
00239 int selectRes = 0;
00240 ssize_t numBytes = 0;
00241
00242 fd_set readfds;
00243 if(timeout > 0) {
00244 long i = (long)floor(timeout);
00245 time.tv_sec = i;
00246 time.tv_usec = (long)floor((timeout - i) * 1e6);
00247 timep = &time;
00248 }
00249
00250 lastFdPlusOne = termReader_getSocket(sr->superReader) + 1;
00251 FD_ZERO(&readfds);
00252 FD_SET(termReader_getSocket(sr->superReader), &readfds);
00253
00254 selectRes = select(lastFdPlusOne, &readfds, NULL, NULL, timep);
00255
00256 switch(selectRes) {
00257 case -1:
00258 perror("stringTermReader_fillData bad select");
00259 termReader_setError(sr->superReader, TERMREADER_SELECTERR);
00260 return;
00261 case 0:
00262 termReader_setError(sr->superReader, TERMREADER_TIMEOUT);
00263 return;
00264 default:
00265 #ifdef _WINDOWS
00266 numBytes = recv(termReader_getSocket(sr->superReader), buf, AMOUNTTOREAD, 0);
00267 #else
00268 numBytes = read(termReader_getSocket(sr->superReader), buf, AMOUNTTOREAD);
00269 #endif
00270
00271 switch(numBytes) {
00272 case -1:
00273 #ifndef _WINDOWS
00274
00275
00276
00277 perror("stringTermReader_fillData bad read");
00278 #endif
00279 termReader_setError(sr->superReader, TERMREADER_READERR);
00280 return;
00281 case 0:
00282 termReader_setError(sr->superReader, TERMREADER_NOCONN);
00283 return;
00284 }
00285 }
00286
00287 buf[numBytes] = '\0';
00288
00289 stringTermReader_addToCurrentBuffer(sr, buf, numBytes);
00290 termReader_setError(sr->superReader, TERMREADER_OKAY);
00291 }
00292
00293 void stringTermReader_addToCurrentBuffer(StringTermReader* sr, char* buf, ssize_t len)
00294 {
00295
00296
00297 size_t needCapacity = len + sr->bufUsed + 1;
00298
00299 if(needCapacity > sr->bufCapacity) {
00300 while(sr->bufCapacity < needCapacity) {
00301 sr->bufCapacity *= 2;
00302 }
00303 sr->currentBuffer = realloc(sr->currentBuffer, sr->bufCapacity);
00304 }
00305 else if(needCapacity < (sr->bufCapacity / 2)) {
00306 sr->bufCapacity /= 2;
00307 sr->currentBuffer = realloc(sr->currentBuffer, sr->bufCapacity);
00308 }
00309 else {
00310
00311 }
00312
00313 memcpy(sr->currentBuffer + sr->bufUsed, buf, len);
00314 sr->bufUsed += len;
00315 sr->currentBuffer[sr->bufUsed] = '\0';
00316 }