Article 6042 of alt.sources: Path: jac.zko.dec.com!pa.dec.com!decuac.dec.com!haven.umd.edu!ames!olivea!strobe!jerry From: jerry@strobe.ATC.Olivetti.Com (Jerry Aguirre) Newsgroups: news.software.nntp,alt.sources Subject: INN streaming patch Followup-To: news.software.nntp Date: 26 Jun 1995 20:53:51 GMT Organization: Olivetti ATC; Cupertino, CA; USA Lines: 2508 Message-ID: <3sn6ov$add@olivea.ATC.Olivetti.Com> References: <3sn5oi$6ce@olivea.ATC.Olivetti.Com> <3sn6cq$8v8@olivea.atc.olivetti.com> NNTP-Posting-Host: strobe.atc.olivetti.com Keywords: INN usenet nntp patch Xref: jac.zko.dec.com news.software.nntp:14388 alt.sources:6042 Here is an improved patch to modify INN 1.4 to support the "streaming" extensions to the NNTP protocol. When a modified innxmit is sending to a modified innd they negotiate use of the new protocol. The new protocol eliminates the effect of RTT on thruput by sending several articles before waiting for responses. This improves thruput especially on certain serial links. The is the second patch published and should be identified with the date 26Jun95. This patch corrects several syntactical problems experienced when compiling on certain systems. This patch also protects itself from duplicate IDs in the queue. Certain types of feeds are prone to generating duplicate entries in the feed. These could confuse previous versions as the ID could match the wrong entry in the internal array. Duplicate IDs are now detected and skipped. For those who the previous patch please note that only innxmit.c is different from the previous patch. I will post a separate diff of the previous patch to this one. Jerry Aguirre ===BEGIN PATCH=== *** innd/chan.c.orig Thu Mar 18 13:04:23 1993 --- innd/chan.c Mon Nov 21 11:12:47 1994 *************** *** 117,122 **** --- 117,123 ---- cp->In = in; cp->Out = out; cp->Tracing = Tracing; + cp->Sendid.Size=0; /* Make the descriptor close-on-exec and non-blocking. */ CloseOnExec(fd, TRUE); *************** *** 210,215 **** --- 211,220 ---- if (cp->Out.Size > BIG_BUFFER) { cp->Out.Size = 0; DISPOSE(cp->Out.Data); + } + if (cp->Sendid.Size) { + cp->Sendid.Size = 0; + DISPOSE(cp->Sendid.Data); } } *** innd/innd.h.orig Thu Mar 18 13:04:26 1993 --- innd/innd.h Thu Jul 28 18:55:25 1994 *************** *** 148,153 **** --- 148,154 ---- BUFFER In; BUFFER Out; BOOL Tracing; + BUFFER Sendid; } CHANNEL; *** doc/innxmit.8.orig Mon Jan 16 13:42:08 1995 --- doc/innxmit.8 Wed Jan 25 17:22:16 1995 *************** *** 11,16 **** --- 11,19 ---- .B \-a ] [ + .B \-c + ] + [ .B \-d ] [ *************** *** 20,25 **** --- 23,31 ---- .B \-r ] [ + .B \-s + ] + [ .BI \-t " timeout" ] [ *************** *** 150,155 **** --- 156,179 ---- The ``\-A'' flag may be used to specify an alternate spool directory to use if the article is not found; this would normally be an NFS-mounted spool directory of a master server with longer expiration times. + .PP + .I Innxmit + will attempt to negotiate a streaming mode extension of the NNTP + protocol with the server at connect time. + If successful it will use a slightly different protocol that enhances + throughput. + If the server does not recognize the streaming mode negotiation + .I innxmit + will revert to normal NNTP transfer mode. + Use the ``\-s'' flag to disable the attempt to negotiate the streaming + mode extension. + In streaming mode a check of each message ID is still made to avoid sending + articles already on the server. + The ``\-c'' flag will, if streaming mode is supported, + result in sending articles without checking. + This results in slightly greater throughput and may be appropriate when + it is known that the site could not already have the articles such as in + the case of a "leaf" site. .SH HISTORY Written by Rich $alz for InterNetNews. .de R$ *** backends/innxmit.c.orig Thu Mar 18 13:03:28 1993 --- backends/innxmit.c Mon Jun 26 13:05:57 1995 *************** *** 1,6 **** --- 1,7 ---- /* $Revision: 1.14 $ ** ** Transmit articles to remote site. + ** Modified for NNTP streaming: 26June95 Jerry Aguirre */ #include "configdata.h" #include *************** *** 39,45 **** --- 40,84 ---- #define OUTPUT_BUFFER_SIZE (16 * 1024) + /* Streaming extensions to NNTP. This extension removes the lock-step + ** limitation of conventional NNTP. Article transfer is several times + ** faster. Negotiated and falls back to old mode if receiver refuses. + */ + /* max number of articles that can be streamed ahead */ + #define STNBUF 32 + + /* Send "takethis" without "check" if this many articles were + ** accepted in a row. + */ + #define STNC 16 + + /* typical number of articles to stream */ + /* must be able to fopen this many articles */ + #define STNBUFL (STNBUF/2) + + /* number of retries before requeueing to disk */ + #define STNRETRY 5 + + struct stbufs { /* for each article we are procesing */ + char *st_fname; /* file name */ + char *st_id; /* message ID */ + int st_retry; /* retry count */ + int st_age; /* age count */ + QIOSTATE *st_qp; /* IO to read article contents */ + }; + static struct stbufs stbuf[STNBUF]; /* we keep track of this many articles */ + static int stnq; /* current number of active entries in stbuf */ + static int stahead; /* streaming mode "slow start" counter */ + static long stnofail; /* Count of consecutive successful sends */ + + static int TryStream = TRUE; /* Should attempt stream negotation? */ + static int CanStream = FALSE; /* Result of stream negotation */ + static int DoCheck = TRUE; /* Should check before takethis? */ + static char modestream[] = "mode stream"; + + + /* ** Syslog formats - collected together so they remain consistent */ *************** *** 51,56 **** --- 90,98 ---- STATIC char CANT_AUTHENTICATE[] = "%s authenticate failed %s"; STATIC char IHAVE_FAIL[] = "%s ihave failed %s"; + STATIC char CANT_FINDIT[] = "%s can't find %s"; + STATIC char CANT_PARSEIT[] = "%s can't parse ID %s"; + STATIC char UNEXPECTED[] = "%s unexpected response code %s"; /* ** Global variables. *************** *** 83,88 **** --- 125,132 ---- STATIC unsigned long STAToffered; STATIC unsigned long STATrefused; STATIC unsigned long STATrejected; + STATIC char *AltSpool; + STATIC char *AltPath; /* *************** *** 171,189 **** { int i; i = xwrite(ToServer, REMbuffer, (int)(REMbuffptr - REMbuffer)); REMbuffptr = REMbuffer; return i < 0 ? FALSE : TRUE; } /* ** Send a line to the server, adding the dot escape and \r\n. */ STATIC BOOL ! REMwrite(p, i) register char *p; register int i; { static char HDR[] = "Content-Transfer-Encoding:"; static char COD[] = --- 215,308 ---- { int i; + if (REMbuffptr == REMbuffer) return TRUE; /* nothing buffered */ i = xwrite(ToServer, REMbuffer, (int)(REMbuffptr - REMbuffer)); REMbuffptr = REMbuffer; return i < 0 ? FALSE : TRUE; } + /* + ** Return TRUE if the MessageID duplicates an existing entry. + ** Duplicates sometime get into the queue and this greatly confuses + ** the protocol. + */ + STATIC BOOL + stisdup(MessageID) + char *MessageID; + { + register int i; + for (i = 0; i < STNBUF; i++) { /* linear search for ID */ + if ((stbuf[i].st_id) && (stbuf[i].st_id[0]) + && (0 == strcasecmp(MessageID, stbuf[i].st_id))) { + register int n; + + for (n = 0; (MessageID[n] != '@') && (MessageID[n] != '\0'); n++) ; + /* left of '@' is case sensitive */ + if (strncmp(MessageID, stbuf[i].st_id, n)) continue; + else break; /* found a match */ + } + } + return (i < STNBUF); /* true if we found an existing entry */ + } + + /* stalloc(): save path, ID, and qp into one of the streaming mode entries */ + STATIC int + stalloc(Article, MessageID, qp) + char *Article; + char *MessageID; + register QIOSTATE *qp; + { + register int i; + + for (i = 0; i < STNBUF; i++) { + if ((!stbuf[i].st_fname) || (stbuf[i].st_fname[0] == '\0')) break; + } + if (i >= STNBUF) { /* stnq says not full but can not find unused */ + syslog(L_ERROR, "stalloc: Internal error"); + return (-1); + } + if ((int)strlen(Article) >= SPOOLNAMEBUFF) { + syslog(L_ERROR, "stalloc: filename longer than %d", SPOOLNAMEBUFF); + return (TRUE); + } + /* allocate buffers on first use. + ** If filename ever is longer than SPOOLNAMEBUFF then code will abort. + ** If ID is ever longer than NNTP_STRLEN then other code would break. + */ + if (!stbuf[i].st_fname) stbuf[i].st_fname = NEW(char, SPOOLNAMEBUFF); + if (!stbuf[i].st_id) stbuf[i].st_id = NEW(char, NNTP_STRLEN); + (void)strcpy(stbuf[i].st_fname, Article); + (void)strcpy(stbuf[i].st_id, MessageID); + stbuf[i].st_qp = qp; + stbuf[i].st_retry = 0; + stbuf[i].st_age = 0; + stnq++; + return i; + } + + /* strel(): release for reuse one of the streaming mode entries */ + STATIC void + strel(i) + int i; + { + if (stbuf[i].st_qp) { + QIOclose(stbuf[i].st_qp); + stbuf[i].st_qp = 0; + } + if (stbuf[i].st_id) stbuf[i].st_id[0] = '\0'; + if (stbuf[i].st_fname) stbuf[i].st_fname[0] = '\0'; + stnq--; + } + /* ** Send a line to the server, adding the dot escape and \r\n. */ STATIC BOOL ! REMwrite(p, i, escdot) register char *p; register int i; + register BOOL escdot; { static char HDR[] = "Content-Transfer-Encoding:"; static char COD[] = *************** *** 212,218 **** } /* Dot escape, text of the line, line terminator. */ ! if (*p == '.') *REMbuffptr++ = '.'; if (i > MEMCPY_THRESHOLD) { (void)memcpy((POINTER)REMbuffptr, (POINTER)p, (SIZE_T)i); --- 331,337 ---- } /* Dot escape, text of the line, line terminator. */ ! if (escdot && (*p == '.')) *REMbuffptr++ = '.'; if (i > MEMCPY_THRESHOLD) { (void)memcpy((POINTER)REMbuffptr, (POINTER)p, (SIZE_T)i); *************** *** 304,310 **** double systime; if (!Purging) { ! (void)REMwrite(QUIT, STRLEN(QUIT)); (void)REMflush(); } (void)GetTimeInfo(&Now); --- 423,429 ---- double systime; if (!Purging) { ! (void)REMwrite(QUIT, STRLEN(QUIT), FALSE); (void)REMflush(); } (void)GetTimeInfo(&Now); *************** *** 342,348 **** CloseAndRename() { /* Close the files, rename the temporary. */ ! QIOclose(BATCHqp); if (ferror(BATCHfp) || fflush(BATCHfp) == EOF || fclose(BATCHfp) == EOF) { --- 461,470 ---- CloseAndRename() { /* Close the files, rename the temporary. */ ! if (BATCHqp) { ! QIOclose(BATCHqp); ! BATCHqp = NULL; ! } if (ferror(BATCHfp) || fflush(BATCHfp) == EOF || fclose(BATCHfp) == EOF) { *************** *** 411,442 **** } (void)fprintf(stderr, "Rewriting batch file and exiting.\n"); Requeue(Article, MessageID); ! for ( ; ; ) { ! if ((p = QIOread(BATCHqp)) == NULL) { ! if (QIOerror(BATCHqp)) { ! (void)fprintf(stderr, "Can't read \"%s\", %s\n", ! BATCHname, strerror(errno)); ! ExitWithStats(1); } - if (QIOtoolong(BATCHqp)) { - (void)fprintf(stderr, "Skipping long line in \"%s\".\n", - BATCHname); - (void)QIOread(BATCHqp); - continue; - } ! /* Normal EOF. */ ! break; } - - if (fprintf(BATCHfp, "%s\n", p) == EOF - || ferror(BATCHfp)) { - (void)fprintf(stderr, "Can't requeue \"%s\", %s\n", - p, strerror(errno)); - ExitWithStats(1); - } } CloseAndRename(); --- 533,580 ---- } (void)fprintf(stderr, "Rewriting batch file and exiting.\n"); + if (CanStream) { /* streaming mode has a buffer of articles */ + register int i; + + for (i = 0; i < STNBUF; i++) { /* requeue unacknowledged articles */ + if ((stbuf[i].st_fname) && (stbuf[i].st_fname[0] != '\0')) { + if (Debug) + (void)fprintf(stderr, "stbuf[%d]= %s, %s\n", + i, stbuf[i].st_fname, stbuf[i].st_id); + Requeue(stbuf[i].st_fname, stbuf[i].st_id); + if (Article == stbuf[i].st_fname) Article = NULL; + strel(i); /* release entry */ + } + } + } Requeue(Article, MessageID); ! if (BATCHqp) { ! for ( ; ; ) { ! if ((p = QIOread(BATCHqp)) == NULL) { ! if (QIOtoolong(BATCHqp)) { ! (void)fprintf(stderr, "Skipping long line in \"%s\".\n", ! BATCHname); ! (void)QIOread(BATCHqp); ! continue; ! } ! if (QIOerror(BATCHqp)) { ! (void)fprintf(stderr, "Can't read \"%s\", %s\n", ! BATCHname, strerror(errno)); ! ExitWithStats(1); ! } ! ! /* Normal EOF. */ ! break; } ! if (fprintf(BATCHfp, "%s\n", p) == EOF ! || ferror(BATCHfp)) { ! (void)fprintf(stderr, "Can't requeue \"%s\", %s\n", ! p, strerror(errno)); ! ExitWithStats(1); ! } } } CloseAndRename(); *************** *** 569,584 **** for (InHeaders = TRUE; ; ) { if ((p = QIOread(qp)) == NULL) { if (QIOerror(qp)) { (void)fprintf(stderr, "Can't read \"%s\", %s\n", Article, strerror(errno)); return FALSE; } - if (QIOtoolong(qp)) { - (void)fprintf(stderr, "Line too long in \"%s\"\n", Article); - (void)QIOread(BATCHqp); - continue; - } /* Normal EOF. */ break; --- 707,722 ---- for (InHeaders = TRUE; ; ) { if ((p = QIOread(qp)) == NULL) { + if (QIOtoolong(qp)) { + (void)fprintf(stderr, "Line too long in \"%s\"\n", Article); + (void)QIOread(qp); + continue; + } if (QIOerror(qp)) { (void)fprintf(stderr, "Can't read \"%s\", %s\n", Article, strerror(errno)); return FALSE; } /* Normal EOF. */ break; *************** *** 587,593 **** InHeaders = FALSE; if (InHeaders || MimeArticle == MTnotmime) { ! if (!REMwrite(p, QIOlength(qp))) { (void)fprintf(stderr, "Can't send \"%s\", %s\n", Article, strerror(errno)); return FALSE; --- 725,731 ---- InHeaders = FALSE; if (InHeaders || MimeArticle == MTnotmime) { ! if (!REMwrite(p, QIOlength(qp), TRUE)) { (void)fprintf(stderr, "Can't send \"%s\", %s\n", Article, strerror(errno)); return FALSE; *************** *** 612,619 **** if (GotInterrupt) Interrupted(Article, MessageID); } ! if (!REMflush()) { ! (void)fprintf(stderr, "Can't end \"%s\", %s\n", Article, strerror(errno)); return FALSE; } --- 750,758 ---- if (GotInterrupt) Interrupted(Article, MessageID); } ! /* Write the terminator. */ ! if (!REMwrite(".", 1, FALSE)) { ! (void)fprintf(stderr, "Can't send \"%s\", %s\n", Article, strerror(errno)); return FALSE; } *************** *** 620,637 **** if (Debug) (void)fprintf(stderr, "> [ article ]%s\n", MimeArticle == MTnotmime ? "" : " (Mime: quoted-printable)"); - - /* Write the terminator. */ - if (write(ToServer, TERM, STRLEN(TERM)) != STRLEN(TERM)) { - (void)fprintf(stderr, "Can't end \"%s\", %s\n", - Article, strerror(errno)); - return FALSE; - } if (GotInterrupt) Interrupted(Article, MessageID); if (Debug) (void)fprintf(stderr, "> .\n"); /* What did the remote site say? */ if (!REMread(buff, (int)sizeof buff)) { (void)fprintf(stderr, "No reply after sending \"%s\", %s\n", --- 759,771 ---- if (Debug) (void)fprintf(stderr, "> [ article ]%s\n", MimeArticle == MTnotmime ? "" : " (Mime: quoted-printable)"); if (GotInterrupt) Interrupted(Article, MessageID); if (Debug) (void)fprintf(stderr, "> .\n"); + if (CanStream) return TRUE; /* streaming mode does not wait for ACK */ + /* What did the remote site say? */ if (!REMread(buff, (int)sizeof buff)) { (void)fprintf(stderr, "No reply after sending \"%s\", %s\n", *************** *** 773,779 **** --- 907,1095 ---- longjmp(JMPwhere, 1); } + /* check articles in streaming NNTP mode + ** return TRUE on failure. + */ + STATIC BOOL + check(i) + int i; /* index of stbuf to send check for */ + { + char buff[NNTP_STRLEN]; + /* send "check " to the other system */ + (void)sprintf(buff, "check %s", stbuf[i].st_id); + if (!REMwrite(buff, (int)strlen(buff), FALSE)) { + (void)fprintf(stderr, "Can't check article, %s\n", + strerror(errno)); + return TRUE; + } + STAToffered++; + if (stahead > 0) stahead--; + if (Debug) { + if (stbuf[i].st_retry) + (void)fprintf(stderr, "> %s (retry %d)\n", buff, stbuf[i].st_retry); + else + (void)fprintf(stderr, "> %s\n", buff); + } + if (GotInterrupt) + Interrupted(stbuf[i].st_fname, stbuf[i].st_id); + + /* That all. Response is checked later by strlisten() */ + return FALSE; + } + + /* Send article in "takethis streaming NNTP mode. + ** return TRUE on failure. + */ + STATIC BOOL + takethis(i) + int i; /* index to stbuf to be sent */ + { + char buff[NNTP_STRLEN]; + + if (!stbuf[i].st_qp) { /* should already be open but ... */ + /* Open the article. */ + if (!(stbuf[i].st_qp = QIOopen(stbuf[i].st_fname, QIO_BUFFER))) { + /* can not open it. Should check AltPath */ + if (AltPath && (*(stbuf[i].st_fname) != '/')) { + (void)sprintf(AltPath, "%s/%s", AltSpool, stbuf[i].st_fname); + stbuf[i].st_qp = QIOopen(AltPath, QIO_BUFFER); + } + if (!(stbuf[i].st_qp)) { + strel(i); + return FALSE; /* Not an error. Could be canceled or expired */ + } + } + } + /* send "takethis " to the other system */ + (void)sprintf(buff, "takethis %s", stbuf[i].st_id); + if (!REMwrite(buff, (int)strlen(buff), FALSE)) { + (void)fprintf(stderr, "Can't send takethis , %s\n", + strerror(errno)); + return TRUE; + } + if (Debug) + (void)fprintf(stderr, "> %s\n", buff); + if (GotInterrupt) + Interrupted((char *)0, (char *)0); + if (!REMsendarticle(stbuf[i].st_fname, stbuf[i].st_id, + stbuf[i].st_qp)) + return TRUE; + QIOclose(stbuf[i].st_qp); /* should not need file again */ + stbuf[i].st_qp = 0; /* so close to free descriptor */ + stbuf[i].st_age = 0; + if (stahead < STNBUFL) stahead++; + /* That all. Response is checked later by strlisten() */ + return FALSE; + } + + + /* listen for responses. Process acknowledgments to remove items from + ** the queue. Also sends the articles on request. Returns TRUE on error. + ** return TRUE on failure. + */ + STATIC BOOL + strlisten() + { + int resp; + int i; + char *id, *p; + char buff[NNTP_STRLEN]; + + while(TRUE) { + if (!REMread(buff, (int)sizeof buff)) { + (void)fprintf(stderr, "No reply to check, %s\n", strerror(errno)); + return TRUE; + } + if (GotInterrupt) + Interrupted((char *)0, (char *)0); + if (Debug) + (void)fprintf(stderr, "< %s", buff); + + /* Parse the reply. */ + resp = atoi(buff); + /* Skip the 1XX informational messages */ + if ((resp >= 100) && (resp < 200)) continue; + switch (resp) { /* first time is to verify it */ + case NNTP_ERR_GOTID_VAL: + case NNTP_OK_SENDID_VAL: + case NNTP_OK_RECID_VAL: + case NNTP_ERR_FAILID_VAL: + case NNTP_RESENDID_VAL: + if (id = strchr(buff, '<')) { + p = strchr(id, '>'); + if (p) *(p+1) = '\0'; + for (i = 0; i < STNBUF; i++) { /* linear search for ID */ + if ((stbuf[i].st_id) && (stbuf[i].st_id[0]) + && (0 == strcasecmp(id, stbuf[i].st_id))) { + register int n; + + for (n = 0; (id[n] != '@') && (id[n] != '\0'); n++) ; + /* left of '@' is case sensitive */ + if (strncmp(id, stbuf[i].st_id, n)) continue; + else break; /* found a match */ + } + } + if (i >= STNBUF) { /* should not happen */ + syslog(L_NOTICE, CANT_FINDIT, REMhost, REMclean(buff)); + return (TRUE); /* can't find it! */ + } + } else { + syslog(L_NOTICE, CANT_PARSEIT, REMhost, REMclean(buff)); + return (TRUE); + } + break; + case NNTP_GOODBYE_VAL: + /* Most likely out of space -- no point in continuing. */ + syslog(L_NOTICE, IHAVE_FAIL, REMhost, REMclean(buff)); + return TRUE; + /* NOTREACHED */ + default: + syslog(L_NOTICE, UNEXPECTED, REMhost, REMclean(buff)); + if (Debug) + (void)fprintf(stderr, "Unknown reply \"%s\"", + buff); + return (TRUE); + } + switch (resp) { /* now we take some action */ + case NNTP_RESENDID_VAL: /* remote wants it later */ + /* try again now because time has passed */ + if (stbuf[i].st_retry < STNRETRY) { + if (check(i)) return TRUE; + stbuf[i].st_retry++; + stbuf[i].st_age = 0; + } else { /* requeue to disk for later */ + Requeue(stbuf[i].st_fname, stbuf[i].st_id); + strel(i); /* release entry */ + } + break; + case NNTP_ERR_GOTID_VAL: /* remote doesn't want it */ + strel(i); /* release entry */ + STATrefused++; + stnofail = 0; + break; + + case NNTP_OK_SENDID_VAL: /* remote wants article */ + if (takethis(i)) return TRUE; + stnofail++; + break; + + case NNTP_OK_RECID_VAL: /* remote received it OK */ + strel(i); /* release entry */ + STATaccepted++; + break; + + case NNTP_ERR_FAILID_VAL: + strel(i); /* release entry */ + STATrejected++; + stnofail = 0; + break; + } + break; + } + return (FALSE); + } + /* ** Print a usage message and exit. */ *************** *** 781,787 **** Usage() { (void)fprintf(stderr, ! "Usage: innxmit [-a] [-d] [-M] [-p] [-r] [-S] [-t#] [-T#] host file\n"); exit(1); } --- 1097,1103 ---- Usage() { (void)fprintf(stderr, ! "Usage: innxmit [-a] [-c] [-d] [-M] [-p] [-r] [-s] [-S] [-t#] [-T#] host file\n"); exit(1); } *************** *** 801,812 **** FILE *From; FILE *To; char buff[NNTP_STRLEN]; - char *AltSpool; char *Article; char *ContentEncoding; char *ContentType; char *MessageID; - char *AltPath; SIGHANDLER (*old)(); unsigned int ConnectTimeout; unsigned int TotalTimeout; --- 1117,1126 ---- *************** *** 818,824 **** (void)umask(NEWSUMASK); /* Parse JCL. */ ! while ((i = getopt(ac, av, "A:adMprSt:T:v")) != EOF) switch (i) { default: Usage(); --- 1132,1138 ---- (void)umask(NEWSUMASK); /* Parse JCL. */ ! while ((i = getopt(ac, av, "A:acdMprsSt:T:v")) != EOF) switch (i) { default: Usage(); *************** *** 830,835 **** --- 1144,1152 ---- case 'a': AlwaysRewrite = TRUE; break; + case 'c': + DoCheck = FALSE; + break; case 'd': Debug = TRUE; break; *************** *** 843,848 **** --- 1160,1168 ---- case 'r': DoRequeue = FALSE; break; + case 's': + TryStream = FALSE; + break; case 'S': Slavish = TRUE; break; *************** *** 957,962 **** --- 1277,1325 ---- /* We no longer need standard I/O. */ FromServer = fileno(From); ToServer = fileno(To); + + if (TryStream) { + if (!REMwrite(modestream, (int)strlen(modestream), FALSE)) { + (void)fprintf(stderr, "Can't negotiate %s, %s\n", + modestream, strerror(errno)); + } + if (Debug) + (void)fprintf(stderr, ">%s\n", modestream); + /* Does he understand mode stream? */ + if (!REMread(buff, (int)sizeof buff)) { + (void)fprintf(stderr, "No reply to %s, %s\n", + modestream, strerror(errno)); + } else { + if (Debug) + (void)fprintf(stderr, "< %s", buff); + + /* Parse the reply. */ + switch (atoi(buff)) { + default: + (void)fprintf(stderr, "Unknown reply to \"%s\" -- %s", + modestream, buff); + CanStream = FALSE; + break; + case NNTP_OK_STREAM_VAL: /* YES! */ + CanStream = TRUE; + break; + case NNTP_BAD_COMMAND_VAL: /* normal refusal */ + CanStream = FALSE; + break; + } + } + if (CanStream) { + int i; + + for (i = 0; i < STNBUF; i++) { /* reset buffers */ + stbuf[i].st_fname = 0; + stbuf[i].st_id = 0; + stbuf[i].st_qp = 0; + } + stnq = 0; + stahead = 0; + } + } } /* Set up signal handlers. */ *************** *** 982,992 **** Interrupted(Article, MessageID); if ((Article = QIOread(BATCHqp)) == NULL) { - if (QIOerror(BATCHqp)) { - (void)fprintf(stderr, "Can't read \"%s\", %s\n", - BATCHname, strerror(errno)); - ExitWithStats(1); - } if (QIOtoolong(BATCHqp)) { (void)fprintf(stderr, "Skipping long line in \"%s\"\n", BATCHname); --- 1345,1350 ---- *************** *** 993,1001 **** --- 1351,1365 ---- (void)QIOread(BATCHqp); continue; } + if (QIOerror(BATCHqp)) { + (void)fprintf(stderr, "Can't read \"%s\", %s\n", + BATCHname, strerror(errno)); + ExitWithStats(1); + } /* Normal EOF -- we're done. */ QIOclose(BATCHqp); + BATCHqp = NULL; break; } *************** *** 1100,1107 **** Interrupted(Article, MessageID); /* Offer the article. */ (void)sprintf(buff, "%s %s", Slavish ? "xreplic" : "ihave", MessageID); ! if (!REMwrite(buff, (int)strlen(buff))) { (void)fprintf(stderr, "Can't offer article, %s\n", strerror(errno)); QIOclose(qp); --- 1464,1519 ---- Interrupted(Article, MessageID); /* Offer the article. */ + if (CanStream) { + int i; + + if (stisdup(MessageID)) { /* skip duplicates in queue */ + if (Debug) + (void)fprintf(stderr, "Skipping duplicate ID %s\n", + MessageID); + QIOclose(qp); + continue; + } + while (stnq >= STNBUFL) { /* need to empty a buffer */ + if (strlisten()) { + RequeueRestAndExit(Article, MessageID); + } + if ((stahead > 0) && (stnq < STNBUF)) break; + } + /* save new article in the buffer */ + i = stalloc(Article, MessageID, qp); + if (i < 0) { + QIOclose(qp); + RequeueRestAndExit(Article, MessageID); + } + if (DoCheck && (stnofail < STNC)) { + if (check(i)) { + RequeueRestAndExit((char *)NULL, (char *)NULL); + } + } else { + if (takethis(i)) { + RequeueRestAndExit((char *)NULL, (char *)NULL); + } + } + for (i = 0; i < STNBUF; i++) { + if ((stbuf[i].st_fname) && (stbuf[i].st_fname[0] != '\0')) { + if (stbuf[i].st_age++ > stnq) { + /* This should not happen but just in case ... */ + if (stbuf[i].st_retry < STNRETRY) { + if (check(i)) return TRUE; /* resend check */ + stbuf[i].st_retry++; + stbuf[i].st_age = 0; + } else { /* requeue to disk for later */ + Requeue(stbuf[i].st_fname, stbuf[i].st_id); + strel(i); /* release entry */ + } + } + } + } + continue; /* next article */ + } (void)sprintf(buff, "%s %s", Slavish ? "xreplic" : "ihave", MessageID); ! if (!REMwrite(buff, (int)strlen(buff), FALSE)) { (void)fprintf(stderr, "Can't offer article, %s\n", strerror(errno)); QIOclose(qp); *************** *** 1154,1159 **** --- 1566,1578 ---- } QIOclose(qp); + } + if (CanStream) { /* need to wait for rest of ACKs */ + while (stnq > 0) { + if (strlisten()) { + RequeueRestAndExit((char *)NULL, (char *)NULL); + } + } } if (BATCHfp != NULL) *** backends/innxmit.c.orig Thu Mar 18 13:03:28 1993 --- backends/innxmit.c Fri Feb 10 23:32:11 1995 *************** *** 39,45 **** --- 39,80 ---- #define OUTPUT_BUFFER_SIZE (16 * 1024) + /* Streaming extensions to NNTP. This extension removes the lock-step + ** limitation of conventional NNTP. Article transfer is several times + ** faster. Negotiated and falls back to old mode if receiver refuses. + */ + /* max number of articles that can be streamed ahead */ + #define STNBUF 16 + /* Send "takethis" without "check" if this many articles were + ** accepted in a row. + */ + #define STNC 20 + /* typical number of articles to stream */ + /* must be able to fopen this many articles */ + #define STNBUFL (STNBUF/2) + /* number of retries before requeueing to disk */ + #define STNRETRY 5 + + struct stbufs { /* for each article we are procesing */ + char *st_fname; /* file name */ + char *st_id; /* message ID */ + int st_retry; /* retry count */ + int st_age; /* age count */ + QIOSTATE *st_qp; /* IO to read article contents */ + }; + static struct stbufs stbuf[STNBUF]; /* we keep track of this many articles */ + static int stnq; /* current number of active entries in stbuf */ + static int stahead; /* streaming mode "slow start" counter */ + static long stnofail; /* Count of consecutive successful sends */ + + static int TryStream = TRUE; /* Should attempt stream negotation? */ + static int CanStream = FALSE; /* Result of stream negotation */ + static int DoCheck = TRUE; /* Should check before takethis? */ + static char modestream[] = "mode stream"; + + + /* ** Syslog formats - collected together so they remain consistent */ *************** *** 51,56 **** --- 86,94 ---- STATIC char CANT_AUTHENTICATE[] = "%s authenticate failed %s"; STATIC char IHAVE_FAIL[] = "%s ihave failed %s"; + STATIC char CANT_FINDIT[] = "%s can't find %s"; + STATIC char CANT_PARSEIT[] = "%s can't parse ID %s"; + STATIC char UNEXPECTED[] = "%s unexpected response code %s"; /* ** Global variables. *************** *** 83,88 **** --- 121,128 ---- STATIC unsigned long STAToffered; STATIC unsigned long STATrefused; STATIC unsigned long STATrejected; + STATIC char *AltSpool; + STATIC char *AltPath; /* *************** *** 171,189 **** { int i; i = xwrite(ToServer, REMbuffer, (int)(REMbuffptr - REMbuffer)); REMbuffptr = REMbuffer; return i < 0 ? FALSE : TRUE; } /* ** Send a line to the server, adding the dot escape and \r\n. */ STATIC BOOL ! REMwrite(p, i) register char *p; register int i; { static char HDR[] = "Content-Transfer-Encoding:"; static char COD[] = --- 211,279 ---- { int i; + if (REMbuffptr == REMbuffer) return TRUE; /* nothing buffered */ i = xwrite(ToServer, REMbuffer, (int)(REMbuffptr - REMbuffer)); REMbuffptr = REMbuffer; return i < 0 ? FALSE : TRUE; } + /* stalloc(): save path, ID, and qp into one of the streaming mode entries */ + STATIC int + stalloc(Article, MessageID, qp) + char *Article; + char *MessageID; + register QIOSTATE *qp; + { + register int i; + for (i = 0; i < STNBUF; i++) { + if ((!stbuf[i].st_fname) || (stbuf[i].st_fname[0] == '\0')) break; + } + if (i >= STNBUF) { /* stnq says not full but can not find unused */ + syslog(L_ERROR, "check: Internal error"); + return (-1); + } + if ((int)strlen(Article) >= SPOOLNAMEBUFF) { + syslog(L_ERROR, "check: filename longer than %d", SPOOLNAMEBUFF); + return (TRUE); + } + /* allocate buffers on first use. + ** If filename ever is longer than SPOOLNAMEBUFF then code will abort. + ** If ID is ever longer than NNTP_STRLEN then other code would break. + */ + if (!stbuf[i].st_fname) stbuf[i].st_fname = NEW(char, SPOOLNAMEBUFF); + if (!stbuf[i].st_id) stbuf[i].st_id = NEW(char, NNTP_STRLEN); + (void)strcpy(stbuf[i].st_fname, Article); + (void)strcpy(stbuf[i].st_id, MessageID); + stbuf[i].st_qp = qp; + stbuf[i].st_retry = 0; + stbuf[i].st_age = 0; + stnq++; + return i; + } + + /* strel(): release for reuse one of the streaming mode entries */ + STATIC void + strel(i) + int i; + { + if (stbuf[i].st_qp) { + QIOclose(stbuf[i].st_qp); + stbuf[i].st_qp = 0; + } + if (stbuf[i].st_id) stbuf[i].st_id[0] = '\0'; + if (stbuf[i].st_fname) stbuf[i].st_fname[0] = '\0'; + stnq--; + } + /* ** Send a line to the server, adding the dot escape and \r\n. */ STATIC BOOL ! REMwrite(p, i, escdot) register char *p; register int i; + register BOOL escdot; { static char HDR[] = "Content-Transfer-Encoding:"; static char COD[] = *************** *** 212,218 **** } /* Dot escape, text of the line, line terminator. */ ! if (*p == '.') *REMbuffptr++ = '.'; if (i > MEMCPY_THRESHOLD) { (void)memcpy((POINTER)REMbuffptr, (POINTER)p, (SIZE_T)i); --- 302,308 ---- } /* Dot escape, text of the line, line terminator. */ ! if (escdot && (*p == '.')) *REMbuffptr++ = '.'; if (i > MEMCPY_THRESHOLD) { (void)memcpy((POINTER)REMbuffptr, (POINTER)p, (SIZE_T)i); *************** *** 304,310 **** double systime; if (!Purging) { ! (void)REMwrite(QUIT, STRLEN(QUIT)); (void)REMflush(); } (void)GetTimeInfo(&Now); --- 394,400 ---- double systime; if (!Purging) { ! (void)REMwrite(QUIT, STRLEN(QUIT), FALSE); (void)REMflush(); } (void)GetTimeInfo(&Now); *************** *** 342,348 **** CloseAndRename() { /* Close the files, rename the temporary. */ ! QIOclose(BATCHqp); if (ferror(BATCHfp) || fflush(BATCHfp) == EOF || fclose(BATCHfp) == EOF) { --- 432,441 ---- CloseAndRename() { /* Close the files, rename the temporary. */ ! if (BATCHqp) { ! QIOclose(BATCHqp); ! BATCHqp = NULL; ! } if (ferror(BATCHfp) || fflush(BATCHfp) == EOF || fclose(BATCHfp) == EOF) { *************** *** 411,442 **** } (void)fprintf(stderr, "Rewriting batch file and exiting.\n"); Requeue(Article, MessageID); ! for ( ; ; ) { ! if ((p = QIOread(BATCHqp)) == NULL) { ! if (QIOerror(BATCHqp)) { ! (void)fprintf(stderr, "Can't read \"%s\", %s\n", ! BATCHname, strerror(errno)); ! ExitWithStats(1); } - if (QIOtoolong(BATCHqp)) { - (void)fprintf(stderr, "Skipping long line in \"%s\".\n", - BATCHname); - (void)QIOread(BATCHqp); - continue; - } ! /* Normal EOF. */ ! break; } - - if (fprintf(BATCHfp, "%s\n", p) == EOF - || ferror(BATCHfp)) { - (void)fprintf(stderr, "Can't requeue \"%s\", %s\n", - p, strerror(errno)); - ExitWithStats(1); - } } CloseAndRename(); --- 504,551 ---- } (void)fprintf(stderr, "Rewriting batch file and exiting.\n"); + if (CanStream) { /* streaming mode has a buffer of articles */ + register int i; + + for (i = 0; i < STNBUF; i++) { /* requeue unacknowledged articles */ + if ((stbuf[i].st_fname) && (stbuf[i].st_fname[0] != '\0')) { + if (Debug) + (void)fprintf(stderr, "stbuf[%d]= %s, %s\n", + i, stbuf[i].st_fname, stbuf[i].st_id); + Requeue(stbuf[i].st_fname, stbuf[i].st_id); + if (Article == stbuf[i].st_fname) Article = NULL; + strel(i); /* release entry */ + } + } + } Requeue(Article, MessageID); ! if (BATCHqp) { ! for ( ; ; ) { ! if ((p = QIOread(BATCHqp)) == NULL) { ! if (QIOtoolong(BATCHqp)) { ! (void)fprintf(stderr, "Skipping long line in \"%s\".\n", ! BATCHname); ! (void)QIOread(BATCHqp); ! continue; ! } ! if (QIOerror(BATCHqp)) { ! (void)fprintf(stderr, "Can't read \"%s\", %s\n", ! BATCHname, strerror(errno)); ! ExitWithStats(1); ! } ! ! /* Normal EOF. */ ! break; } ! if (fprintf(BATCHfp, "%s\n", p) == EOF ! || ferror(BATCHfp)) { ! (void)fprintf(stderr, "Can't requeue \"%s\", %s\n", ! p, strerror(errno)); ! ExitWithStats(1); ! } } } CloseAndRename(); *************** *** 569,584 **** for (InHeaders = TRUE; ; ) { if ((p = QIOread(qp)) == NULL) { if (QIOerror(qp)) { (void)fprintf(stderr, "Can't read \"%s\", %s\n", Article, strerror(errno)); return FALSE; } - if (QIOtoolong(qp)) { - (void)fprintf(stderr, "Line too long in \"%s\"\n", Article); - (void)QIOread(BATCHqp); - continue; - } /* Normal EOF. */ break; --- 678,693 ---- for (InHeaders = TRUE; ; ) { if ((p = QIOread(qp)) == NULL) { + if (QIOtoolong(qp)) { + (void)fprintf(stderr, "Line too long in \"%s\"\n", Article); + (void)QIOread(qp); + continue; + } if (QIOerror(qp)) { (void)fprintf(stderr, "Can't read \"%s\", %s\n", Article, strerror(errno)); return FALSE; } /* Normal EOF. */ break; *************** *** 587,593 **** InHeaders = FALSE; if (InHeaders || MimeArticle == MTnotmime) { ! if (!REMwrite(p, QIOlength(qp))) { (void)fprintf(stderr, "Can't send \"%s\", %s\n", Article, strerror(errno)); return FALSE; --- 696,702 ---- InHeaders = FALSE; if (InHeaders || MimeArticle == MTnotmime) { ! if (!REMwrite(p, QIOlength(qp), TRUE)) { (void)fprintf(stderr, "Can't send \"%s\", %s\n", Article, strerror(errno)); return FALSE; *************** *** 612,619 **** if (GotInterrupt) Interrupted(Article, MessageID); } ! if (!REMflush()) { ! (void)fprintf(stderr, "Can't end \"%s\", %s\n", Article, strerror(errno)); return FALSE; } --- 721,729 ---- if (GotInterrupt) Interrupted(Article, MessageID); } ! /* Write the terminator. */ ! if (!REMwrite(".", 1, FALSE)) { ! (void)fprintf(stderr, "Can't send \"%s\", %s\n", Article, strerror(errno)); return FALSE; } *************** *** 620,637 **** if (Debug) (void)fprintf(stderr, "> [ article ]%s\n", MimeArticle == MTnotmime ? "" : " (Mime: quoted-printable)"); - - /* Write the terminator. */ - if (write(ToServer, TERM, STRLEN(TERM)) != STRLEN(TERM)) { - (void)fprintf(stderr, "Can't end \"%s\", %s\n", - Article, strerror(errno)); - return FALSE; - } if (GotInterrupt) Interrupted(Article, MessageID); if (Debug) (void)fprintf(stderr, "> .\n"); /* What did the remote site say? */ if (!REMread(buff, (int)sizeof buff)) { (void)fprintf(stderr, "No reply after sending \"%s\", %s\n", --- 730,742 ---- if (Debug) (void)fprintf(stderr, "> [ article ]%s\n", MimeArticle == MTnotmime ? "" : " (Mime: quoted-printable)"); if (GotInterrupt) Interrupted(Article, MessageID); if (Debug) (void)fprintf(stderr, "> .\n"); + if (CanStream) return TRUE; /* streaming mode does not wait for ACK */ + /* What did the remote site say? */ if (!REMread(buff, (int)sizeof buff)) { (void)fprintf(stderr, "No reply after sending \"%s\", %s\n", *************** *** 774,779 **** --- 879,1059 ---- } + /* listen for responses. Process acknowledgments to remove items from + ** the queue. Also sends the articles on request. Returns TRUE on error. + ** return TRUE on failure. + */ + STATIC BOOL + listen() + { + int resp; + int i; + char *id, *p; + char buff[NNTP_STRLEN]; + + while(TRUE) { + if (!REMread(buff, (int)sizeof buff)) { + (void)fprintf(stderr, "No reply to check, %s\n", strerror(errno)); + return TRUE; + } + if (GotInterrupt) + Interrupted((char *)0, (char *)0); + if (Debug) + (void)fprintf(stderr, "< %s", buff); + + /* Parse the reply. */ + resp = atoi(buff); + /* Skip the 1XX informational messages */ + if ((resp >= 100) && (resp < 200)) continue; + switch (resp) { /* first time is to verify it */ + case NNTP_ERR_GOTID_VAL: + case NNTP_OK_SENDID_VAL: + case NNTP_OK_RECID_VAL: + case NNTP_ERR_FAILID_VAL: + case NNTP_RESENDID_VAL: + if (id = strchr(buff, '<')) { + p = strchr(id, '>'); + if (p) *(p+1) = '\0'; + for (i = 0; i < STNBUF; i++) { /* linear search for ID */ + if ((stbuf[i].st_id) && (stbuf[i].st_id[0]) + && (0 == strcasecmp(id, stbuf[i].st_id))) break; + } + if (i >= STNBUF) { /* should not happen */ + syslog(L_NOTICE, CANT_FINDIT, REMhost, REMclean(buff)); + return (TRUE); /* can't find it! */ + } + } else { + syslog(L_NOTICE, CANT_PARSEIT, REMhost, REMclean(buff)); + return (TRUE); + } + break; + case NNTP_GOODBYE_VAL: + /* Most likely out of space -- no point in continuing. */ + syslog(L_NOTICE, IHAVE_FAIL, REMhost, REMclean(buff)); + return TRUE; + /* NOTREACHED */ + default: + syslog(L_NOTICE, UNEXPECTED, REMhost, REMclean(buff)); + if (Debug) + (void)fprintf(stderr, "Unknown reply \"%s\"", + buff); + return (TRUE); + } + switch (resp) { /* now we take some action */ + case NNTP_RESENDID_VAL: /* remote wants it later */ + /* try again now because time has passed */ + if (stbuf[i].st_retry < STNRETRY) { + if (check(i)) return TRUE; + stbuf[i].st_retry++; + stbuf[i].st_age = 0; + } else { /* requeue to disk for later */ + Requeue(stbuf[i].st_fname, stbuf[i].st_id); + strel(i); /* release entry */ + } + break; + case NNTP_ERR_GOTID_VAL: /* remote doesn't want it */ + strel(i); /* release entry */ + STATrefused++; + stnofail = 0; + break; + + case NNTP_OK_SENDID_VAL: /* remote wants article */ + if (takethis(i)) return TRUE; + stnofail++; + break; + + case NNTP_OK_RECID_VAL: /* remote received it OK */ + strel(i); /* release entry */ + STATaccepted++; + break; + + case NNTP_ERR_FAILID_VAL: + strel(i); /* release entry */ + STATrejected++; + stnofail = 0; + break; + } + break; + } + return (FALSE); + } + + /* check articles in streaming NNTP mode + ** return TRUE on failure. + */ + STATIC BOOL + check(i) + int i; /* index of stbuf to send check for */ + { + char buff[NNTP_STRLEN]; + + /* send "check " to the other system */ + (void)sprintf(buff, "check %s", stbuf[i].st_id); + if (!REMwrite(buff, (int)strlen(buff), FALSE)) { + (void)fprintf(stderr, "Can't check article, %s\n", + strerror(errno)); + return TRUE; + } + STAToffered++; + if (stahead > 0) stahead--; + if (Debug) { + if (stbuf[i].st_retry) + (void)fprintf(stderr, "> %s (retry %d)\n", buff, stbuf[i].st_retry); + else + (void)fprintf(stderr, "> %s\n", buff); + } + if (GotInterrupt) + Interrupted(stbuf[i].st_fname, stbuf[i].st_id); + + /* That all. Response is checked later by listen() */ + return FALSE; + } + + /* Send article in "takethis streaming NNTP mode. + ** return TRUE on failure. + */ + STATIC BOOL + takethis(i) + int i; /* index to stbuf to be sent */ + { + char buff[NNTP_STRLEN]; + + if (!stbuf[i].st_qp) { /* should already be open but ... */ + /* Open the article. */ + if (!(stbuf[i].st_qp = QIOopen(stbuf[i].st_fname, QIO_BUFFER))) { + /* can not open it. Should check AltPath */ + if (AltPath && (*(stbuf[i].st_fname) != '/')) { + (void)sprintf(AltPath, "%s/%s", AltSpool, stbuf[i].st_fname); + stbuf[i].st_qp = QIOopen(AltPath, QIO_BUFFER); + } + if (!(stbuf[i].st_qp)) { + strel(i); + return FALSE; /* Not an error. Could be canceled or expired */ + } + } + } + /* send "takethis " to the other system */ + (void)sprintf(buff, "takethis %s", stbuf[i].st_id); + if (!REMwrite(buff, (int)strlen(buff), FALSE)) { + (void)fprintf(stderr, "Can't send takethis , %s\n", + strerror(errno)); + return TRUE; + } + if (Debug) + (void)fprintf(stderr, "> %s\n", buff); + if (GotInterrupt) + Interrupted((char *)0, (char *)0); + if (!REMsendarticle(stbuf[i].st_fname, stbuf[i].st_id, + stbuf[i].st_qp)) + return TRUE; + QIOclose(stbuf[i].st_qp); /* should not need file again */ + stbuf[i].st_qp = 0; /* so close to free descriptor */ + stbuf[i].st_age = 0; + if (stahead < STNBUFL) stahead++; + /* That all. Response is checked later by listen() */ + return FALSE; + } + /* ** Print a usage message and exit. */ *************** *** 781,787 **** Usage() { (void)fprintf(stderr, ! "Usage: innxmit [-a] [-d] [-M] [-p] [-r] [-S] [-t#] [-T#] host file\n"); exit(1); } --- 1061,1067 ---- Usage() { (void)fprintf(stderr, ! "Usage: innxmit [-a] [-c] [-d] [-M] [-p] [-r] [-s] [-S] [-t#] [-T#] host file\n"); exit(1); } *************** *** 801,812 **** FILE *From; FILE *To; char buff[NNTP_STRLEN]; - char *AltSpool; char *Article; char *ContentEncoding; char *ContentType; char *MessageID; - char *AltPath; SIGHANDLER (*old)(); unsigned int ConnectTimeout; unsigned int TotalTimeout; --- 1081,1090 ---- *************** *** 818,824 **** (void)umask(NEWSUMASK); /* Parse JCL. */ ! while ((i = getopt(ac, av, "A:adMprSt:T:v")) != EOF) switch (i) { default: Usage(); --- 1096,1102 ---- (void)umask(NEWSUMASK); /* Parse JCL. */ ! while ((i = getopt(ac, av, "A:acdMprsSt:T:v")) != EOF) switch (i) { default: Usage(); *************** *** 830,835 **** --- 1108,1116 ---- case 'a': AlwaysRewrite = TRUE; break; + case 'c': + DoCheck = FALSE; + break; case 'd': Debug = TRUE; break; *************** *** 843,848 **** --- 1124,1132 ---- case 'r': DoRequeue = FALSE; break; + case 's': + TryStream = FALSE; + break; case 'S': Slavish = TRUE; break; *************** *** 957,962 **** --- 1241,1289 ---- /* We no longer need standard I/O. */ FromServer = fileno(From); ToServer = fileno(To); + + if (TryStream) { + if (!REMwrite(modestream, (int)strlen(modestream), FALSE)) { + (void)fprintf(stderr, "Can't negotiate %s, %s\n", + modestream, strerror(errno)); + } + if (Debug) + (void)fprintf(stderr, ">%s\n", modestream); + /* Does he understand mode stream? */ + if (!REMread(buff, (int)sizeof buff)) { + (void)fprintf(stderr, "No reply to %s, %s\n", + modestream, strerror(errno)); + } else { + if (Debug) + (void)fprintf(stderr, "< %s", buff); + + /* Parse the reply. */ + switch (atoi(buff)) { + default: + (void)fprintf(stderr, "Unknown reply to \"%s\" -- %s", + modestream, buff); + CanStream = FALSE; + break; + case NNTP_OK_STREAM_VAL: /* YES! */ + CanStream = TRUE; + break; + case NNTP_BAD_COMMAND_VAL: /* normal refusal */ + CanStream = FALSE; + break; + } + } + if (CanStream) { + int i; + + for (i = 0; i < STNBUF; i++) { /* reset buffers */ + stbuf[i].st_fname = 0; + stbuf[i].st_id = 0; + stbuf[i].st_qp = 0; + } + stnq = 0; + stahead = 0; + } + } } /* Set up signal handlers. */ *************** *** 982,992 **** Interrupted(Article, MessageID); if ((Article = QIOread(BATCHqp)) == NULL) { - if (QIOerror(BATCHqp)) { - (void)fprintf(stderr, "Can't read \"%s\", %s\n", - BATCHname, strerror(errno)); - ExitWithStats(1); - } if (QIOtoolong(BATCHqp)) { (void)fprintf(stderr, "Skipping long line in \"%s\"\n", BATCHname); --- 1309,1314 ---- *************** *** 993,1001 **** --- 1315,1329 ---- (void)QIOread(BATCHqp); continue; } + if (QIOerror(BATCHqp)) { + (void)fprintf(stderr, "Can't read \"%s\", %s\n", + BATCHname, strerror(errno)); + ExitWithStats(1); + } /* Normal EOF -- we're done. */ QIOclose(BATCHqp); + BATCHqp = NULL; break; } *************** *** 1100,1107 **** Interrupted(Article, MessageID); /* Offer the article. */ (void)sprintf(buff, "%s %s", Slavish ? "xreplic" : "ihave", MessageID); ! if (!REMwrite(buff, (int)strlen(buff))) { (void)fprintf(stderr, "Can't offer article, %s\n", strerror(errno)); QIOclose(qp); --- 1428,1475 ---- Interrupted(Article, MessageID); /* Offer the article. */ + if (CanStream) { + int i; + while (stnq >= STNBUFL) { /* need to empty a buffer */ + if (listen()) { + RequeueRestAndExit(Article, MessageID); + } + if ((stahead > 0) && (stnq < STNBUF)) break; + } + /* save new article in the buffer */ + i = stalloc(Article, MessageID, qp); + if (i < 0) { + QIOclose(qp); + RequeueRestAndExit(Article, MessageID); + } + if (DoCheck && (stnofail < STNC)) { + if (check(i)) { + RequeueRestAndExit((char *)NULL, (char *)NULL); + } + } else { + if (takethis(i)) { + RequeueRestAndExit((char *)NULL, (char *)NULL); + } + } + for (i = 0; i < STNBUF; i++) { + if ((stbuf[i].st_fname) && (stbuf[i].st_fname[0] != '\0')) { + if (stbuf[i].st_age++ > stnq) { + /* This should not happen but just in case ... */ + if (stbuf[i].st_retry < STNRETRY) { + if (check(i)) return TRUE; /* resend check */ + stbuf[i].st_retry++; + stbuf[i].st_age = 0; + } else { /* requeue to disk for later */ + Requeue(stbuf[i].st_fname, stbuf[i].st_id); + strel(i); /* release entry */ + } + } + } + } + continue; /* next article */ + } (void)sprintf(buff, "%s %s", Slavish ? "xreplic" : "ihave", MessageID); ! if (!REMwrite(buff, (int)strlen(buff), FALSE)) { (void)fprintf(stderr, "Can't offer article, %s\n", strerror(errno)); QIOclose(qp); *************** *** 1154,1159 **** --- 1522,1534 ---- } QIOclose(qp); + } + if (CanStream) { /* need to wait for rest of ACKs */ + while (stnq > 0) { + if (listen()) { + RequeueRestAndExit((char *)NULL, (char *)NULL); + } + } } if (BATCHfp != NULL) *** innd/nc.c.orig Thu Mar 18 13:04:28 1993 --- innd/nc.c Wed Apr 26 16:25:29 1995 *************** *** 6,11 **** --- 6,13 ---- #include "innd.h" #include "dbz.h" + /* for debugging the "mode stream" code */ + #undef STR_DEBUG #define BAD_COMMAND_COUNT 10 #define WIP_CHECK (1 * 60) *************** *** 52,57 **** --- 54,62 ---- static FUNCTYPE NCxpath(); static FUNCTYPE NCxreplic(); static FUNCTYPE NC_unimp(); + /* new modules for streaming */ + static FUNCTYPE NCcheck(); + static FUNCTYPE NCtakethis(); STATIC int NCcount; /* Number of open connections */ STATIC int NCwipsize; /* Size of NCwip array */ *************** *** 67,72 **** --- 72,79 ---- { "authinfo", NCauthinfo }, { "help", NChelp }, { "ihave", NCihave }, + { "check", NCcheck }, + { "takethis", NCtakethis }, { "list", NClist }, { "mode", NCmode }, { "quit", NCquit }, *************** *** 131,136 **** --- 138,179 ---- /* + ** Write an NNTP reply message. + ** Call only when we will stay in NCreader mode. + ** Tries to do the actual write if it will not block. + */ + STATIC void + NCwritereply(cp, text) + CHANNEL *cp; + char *text; + { + register BUFFER *bp; + register int i; + + bp = &cp->Out; + i = bp->Left; + WCHANappend(cp, text, (int)strlen(text)); /* text in buffer */ + WCHANappend(cp, NCterm, STRLEN(NCterm)); /* add CR NL to text */ + if (i == 0) { /* if only data then try to write directly */ + i = write(cp->fd, &bp->Data[bp->Used], bp->Left); + #ifdef STR_DEBUG + syslog(L_NOTICE, "%s NCwritereply %d=write(%d, \"%.15s\", %d)", + CHANname(cp), i, cp->fd, &bp->Data[bp->Used], bp->Left); + #endif STR_DEBUG + if (i > 0) bp->Used += i; + if (bp->Used == bp->Left) bp->Used = bp->Left = 0; + else i = 0; + } else i = 0; + if (i <= 0) { /* write failed, queue it for later */ + RCHANremove(cp); + WCHANadd(cp); + } + if (Tracing || cp->Tracing) + syslog(L_TRACE, "%s > %s", CHANname(cp), text); + } + + + /* ** Tell the NNTP channel to go away. */ STATIC void *************** *** 188,197 **** case OMrunning: wp = &NCwip[cp->fd]; response = ARTpost(cp, AmSlave ? &wp->Replic : NULL, wp->MessageID); ! if (atoi(response) == NNTP_TOOKIT_VAL) cp->Received++; ! else cp->Rejected++; cp->Reported++; if (cp->Reported >= NNTP_ACTIVITY_SYNC) { syslog(L_NOTICE, --- 231,250 ---- case OMrunning: wp = &NCwip[cp->fd]; response = ARTpost(cp, AmSlave ? &wp->Replic : NULL, wp->MessageID); ! if (atoi(response) == NNTP_TOOKIT_VAL) { cp->Received++; ! if (cp->Sendid.Size > 3) { /* We be streaming */ ! char buff[4]; ! (void)sprintf(buff, "%d", NNTP_OK_RECID_VAL); ! cp->Sendid.Data[0] = buff[0]; ! cp->Sendid.Data[1] = buff[1]; ! cp->Sendid.Data[2] = buff[2]; ! response = cp->Sendid.Data; ! } ! } else { cp->Rejected++; + if (cp->Sendid.Size) response = cp->Sendid.Data; + } cp->Reported++; if (cp->Reported >= NNTP_ACTIVITY_SYNC) { syslog(L_NOTICE, *************** *** 200,206 **** cp->Received, cp->Refused, cp->Rejected); cp->Reported = 0; } ! NCwritetext(cp, response); cp->State = CSgetcmd; break; --- 253,259 ---- cp->Received, cp->Refused, cp->Rejected); cp->Reported = 0; } ! NCwritereply(cp, response); cp->State = CSgetcmd; break; *************** *** 524,534 **** if (HIShavearticle(p)) { cp->Refused++; ! NCwritetext(cp, NNTP_HAVEIT); } else if (NCinprogress(cp, p, &who)) { #if defined(NNTP_RESENDIT_LATER) ! NCwritetext(cp, NNTP_RESENDIT_LATER); #else /* Somebody else is sending it to us; wait until they're done. */ who->Wanted = TRUE; --- 577,587 ---- if (HIShavearticle(p)) { cp->Refused++; ! NCwritereply(cp, NNTP_HAVEIT); } else if (NCinprogress(cp, p, &who)) { #if defined(NNTP_RESENDIT_LATER) ! NCwritereply(cp, NNTP_RESENDIT_LATER); #else /* Somebody else is sending it to us; wait until they're done. */ who->Wanted = TRUE; *************** *** 539,545 **** #endif /* defined(NNTP_RESENDIT_LATER) */ } else { ! NCwritetext(cp, NNTP_SENDIT); cp->State = CSgetarticle; } } --- 592,598 ---- #endif /* defined(NNTP_RESENDIT_LATER) */ } else { ! NCwritereply(cp, NNTP_SENDIT); cp->State = CSgetarticle; } } *************** *** 606,612 **** h = HOnnrpd; else if (caseEQ(p, "query")) h = HOnnrqd; ! else { NCwritetext(cp, NCbadcommand); return; } --- 659,670 ---- h = HOnnrpd; else if (caseEQ(p, "query")) h = HOnnrqd; ! else if (caseEQ(p, "stream")) { ! char buff[16]; ! (void)sprintf(buff, "%d StreamOK.", NNTP_OK_STREAM_VAL); ! NCwritetext(cp, buff); ! return; ! } else { NCwritetext(cp, NCbadcommand); return; } *************** *** 779,785 **** --- 837,849 ---- char buff[SMBUF]; char *av[2]; int i; + int rest; + int SaveUsed; + #ifdef STR_DEBUG + syslog(L_NOTICE, "%s NCreader Used=%d", + CHANname(cp), cp->In.Used); + #endif STR_DEBUG /* Read any data that's there; ignore errors (retry next time it's our * turn) and if we got nothing, then it's EOF so mark it closed. */ if ((i = CHANreadtext(cp)) < 0) { *************** *** 801,946 **** bp = &cp->In; p = &bp->Data[bp->Used]; ! switch (cp->State) { ! default: ! syslog(L_ERROR, "%s internal NCreader state %d", ! CHANname(cp), cp->State); ! break; ! ! case CSgetcmd: ! case CSgetauth: ! /* Did we get the whole command, terminated with "\r\n"? */ ! if (bp->Used < 2 || p[-2] != '\r' || p[-1] != '\n') break; - p[-2] = '\0'; - bp->Used -= 2; ! /* Ignore blank lines. */ ! if (bp->Used == 0) ! break; ! if (Tracing || cp->Tracing) ! syslog(L_TRACE, "%s < %s", CHANname(cp), bp->Data); ! /* We got something -- stop sleeping (in case we were). */ ! SCHANremove(cp); ! if (cp->Argument != NULL) { ! DISPOSE(cp->Argument); ! cp->Argument = NULL; ! } ! if (cp->State == CSgetauth) { ! if (caseEQn(bp->Data, "mode", 4)) ! NCmode(cp); ! else ! NCauthinfo(cp); ! break; ! } ! /* Loop through the command table. */ ! for (p = bp->Data, dp = NCcommands; dp < ENDOF(NCcommands); dp++) ! if (caseEQn(p, dp->Name, dp->Size)) { ! (*dp->Function)(cp); ! cp->BadCommands = 0; break; } ! if (dp == ENDOF(NCcommands)) { ! NCwritetext(cp, NCbadcommand); ! if (++(cp->BadCommands) >= BAD_COMMAND_COUNT) ! cp->State = CSwritegoodbye; ! for (i = 0; (p = NCquietlist[i]) != NULL; i++) ! if (caseEQ(p, dp->Name)) break; ! if (p == NULL) ! syslog(L_NOTICE, "%s bad_command %s", ! CHANname(cp), MaxLength(bp->Data, bp->Data)); ! } ! break; ! case CSgetarticle: ! case CSgetrep: ! /* Reading an article; look for "\r\n.\r\n" terminator. */ ! if (!ART_EOF(bp->Used, p)) { /* Check for the null article. */ ! if (bp->Used == 3 ! && p[-3] == '.' && p[-2] == '\r' && p[-1] == '\n') { cp->Rejected++; ! NCwritetext(cp, NNTP_REJECTIT_EMPTY); cp->State = CSgetcmd; bp->Used = 0; /* Clear the work-in-progress entry. */ NCclearwip(wp); } ! ! /* Check for big articles. */ ! if (LargestArticle > SAVE_AMT && bp->Used > LargestArticle) { ! /* Make some room, saving only the last few bytes. */ ! for (p = bp->Data, i = 0; i < SAVE_AMT; p++, i++) ! p[0] = p[bp->Used - SAVE_AMT]; ! wp->Size += bp->Used - SAVE_AMT; ! bp->Used = SAVE_AMT; ! cp->State = CSeatarticle; } ! break; ! } ! /* Strip article terminator and post the article. */ ! p[-3] = '\0'; ! bp->Used -= 3; ! SCHANremove(cp); ! if (cp->Argument != NULL) { ! DISPOSE(cp->Argument); ! cp->Argument = NULL; ! } ! NCclean(bp); ! NCpostit(cp); ! break; ! ! case CSeatarticle: ! /* Eat the article and then complain that it was too large */ ! if (ART_EOF(bp->Used, p)) { ! /* Reached the end of the article. */ SCHANremove(cp); if (cp->Argument != NULL) { DISPOSE(cp->Argument); cp->Argument = NULL; } ! p = wp->MessageID; ! i = wp->Size + bp->Used; ! syslog(L_ERROR, "%s internal rejecting huge article %s (%d > %d)", ! CHANname(cp), p ? p : "(null)", i, LargestArticle); ! (void)sprintf(buff, "%d Article exceeds local limit of %ld bytes", ! NNTP_REJECTIT_VAL, LargestArticle); ! NCwritetext(cp, buff); cp->State = CSgetcmd; ! cp->Rejected++; ! /* Write a local cancel entry so nobody else gives it to us. */ ! if (p) { ! av[0] = p; ! av[1] = NULL; ! if ((q = CCcancel(av)) != NULL) ! syslog(L_ERROR, "%s cant cancel %s %s", LogName, av[0], q); } ! /* Clear the work-in-progress entry. */ ! NCclearwip(wp); ! /* Reset input buffer to the default size; don't let realloc ! * be lazy. */ ! DISPOSE(bp->Data); ! bp->Size = START_BUFF_SIZE; ! bp->Used = 0; ! bp->Data = NEW(char, bp->Size); } ! else if (bp->Used > 8 * 1024) { ! /* Make some room; save the last few bytes of the article */ ! for (p = bp->Data, i = 0; i < SAVE_AMT; p++, i++) ! p[0] = p[bp->Used - SAVE_AMT + 0]; ! wp->Size += bp->Used - SAVE_AMT; ! bp->Used = SAVE_AMT; ! } ! break; } } --- 865,1080 ---- bp = &cp->In; p = &bp->Data[bp->Used]; ! rest = SaveUsed = bp->Used; ! for ( ; ; ) { ! #ifdef STR_DEBUG ! if (bp->Used > 15) ! syslog(L_NOTICE, "%s NCreader state=%d next \"%.15s\"", ! CHANname(cp), cp->State, bp->Data); ! #endif STR_DEBUG ! switch (cp->State) { ! default: ! syslog(L_ERROR, "%s internal NCreader state %d", ! CHANname(cp), cp->State); break; ! case CSgetcmd: ! case CSgetauth: ! /* Did we get the whole command, terminated with "\r\n"? */ ! for (i = 0; (i < bp->Used) && (bp->Data[i] != '\n'); i++) ; ! if (i < bp->Used) rest = bp->Used = ++i; ! else { ! rest = 0; ! break; /* come back later for rest of line */ ! } ! p = &bp->Data[rest]; ! if (rest < 2 || p[-2] != '\r' || p[-1] != '\n') ! break; ! p[-2] = '\0'; ! bp->Used -= 2; ! /* Ignore blank lines. */ ! if (bp->Data[0] == '\0') ! break; ! if (Tracing || cp->Tracing) ! syslog(L_TRACE, "%s < %s", CHANname(cp), bp->Data); ! /* We got something -- stop sleeping (in case we were). */ ! SCHANremove(cp); ! if (cp->Argument != NULL) { ! DISPOSE(cp->Argument); ! cp->Argument = NULL; ! } ! if (cp->State == CSgetauth) { ! if (caseEQn(bp->Data, "mode", 4)) ! NCmode(cp); ! else ! NCauthinfo(cp); break; } ! ! /* Loop through the command table. */ ! for (p = bp->Data, dp = NCcommands; dp < ENDOF(NCcommands); dp++) ! if (caseEQn(p, dp->Name, dp->Size)) { ! (*dp->Function)(cp); ! cp->BadCommands = 0; break; ! } ! if (dp == ENDOF(NCcommands)) { ! NCwritetext(cp, NCbadcommand); ! if (++(cp->BadCommands) >= BAD_COMMAND_COUNT) { ! cp->State = CSwritegoodbye; ! rest = SaveUsed; ! } ! for (i = 0; (p = NCquietlist[i]) != NULL; i++) ! if (caseEQ(p, dp->Name)) ! break; ! if (p == NULL) ! syslog(L_NOTICE, "%s bad_command %s", ! CHANname(cp), MaxLength(bp->Data, bp->Data)); ! } ! break; ! case CSgetarticle: ! case CSgetrep: /* Check for the null article. */ ! if ((bp->Used >= 3) && (bp->Data[0] == '.') ! && (bp->Data[1] == '\r') && (bp->Data[2] == '\n')) { ! rest = 3; /* null article (canceled?) */ cp->Rejected++; ! if (cp->Sendid.Size > 3) { /* We be streaming */ ! char buff[4]; ! (void)sprintf(buff, "%d", NNTP_ERR_FAILID_VAL); ! cp->Sendid.Data[0] = buff[0]; ! cp->Sendid.Data[1] = buff[1]; ! cp->Sendid.Data[2] = buff[2]; ! NCwritereply(cp, cp->Sendid.Data); ! } ! else NCwritetext(cp, NNTP_REJECTIT_EMPTY); cp->State = CSgetcmd; bp->Used = 0; /* Clear the work-in-progress entry. */ NCclearwip(wp); + break; } ! /* Reading an article; look for "\r\n.\r\n" terminator. */ ! for (i = 5; i <= bp->Used; i++) { ! if ((bp->Data[i - 5] == '\r') ! && (bp->Data[i - 4] == '\n') ! && (bp->Data[i - 3] == '.') ! && (bp->Data[i - 2] == '\r') ! && (bp->Data[i - 1] == '\n')) { ! rest = bp->Used = i; ! p = &bp->Data[i]; ! break; ! } } ! if (i > bp->Used) { /* did not find terminator */ ! /* Check for big articles. */ ! if (LargestArticle > SAVE_AMT && bp->Used > LargestArticle) { ! /* Make some room, saving only the last few bytes. */ ! for (p = bp->Data, i = 0; i < SAVE_AMT; p++, i++) ! p[0] = p[bp->Used - SAVE_AMT]; ! wp->Size += bp->Used - SAVE_AMT; ! bp->Used = SAVE_AMT; ! cp->State = CSeatarticle; ! } else rest = 0; ! break; ! } ! /* Strip article terminator and post the article. */ ! p[-3] = '\0'; ! bp->Used -= 2; SCHANremove(cp); if (cp->Argument != NULL) { DISPOSE(cp->Argument); cp->Argument = NULL; } ! NCclean(bp); ! NCpostit(cp); cp->State = CSgetcmd; ! break; ! case CSeatarticle: ! /* Eat the article and then complain that it was too large */ ! /* Reading an article; look for "\r\n.\r\n" terminator. */ ! for (i = 5; i <= bp->Used; i++) { ! if ((bp->Data[i - 5] == '\r') ! && (bp->Data[i - 4] == '\n') ! && (bp->Data[i - 3] == '.') ! && (bp->Data[i - 2] == '\r') ! && (bp->Data[i - 1] == '\n')) { ! rest = bp->Used = i; ! p = &bp->Data[i]; ! break; ! } } + if (i <= bp->Used) { /* did find terminator */ + /* Reached the end of the article. */ + SCHANremove(cp); + if (cp->Argument != NULL) { + DISPOSE(cp->Argument); + cp->Argument = NULL; + } + p = wp->MessageID; + i = wp->Size + bp->Used; + syslog(L_ERROR, "%s internal rejecting huge article %s (%d > %d)", + CHANname(cp), p ? p : "(null)", i, LargestArticle); + (void)sprintf(buff, "%d Article exceeds local limit of %ld bytes", + NNTP_REJECTIT_VAL, LargestArticle); + if (cp->Sendid.Size) NCwritetext(cp, cp->Sendid.Data); + else NCwritetext(cp, buff); + cp->State = CSgetcmd; + cp->Rejected++; ! /* Write a local cancel entry so nobody else gives it to us. */ ! if (p) { ! av[0] = p; ! av[1] = NULL; ! if ((q = CCcancel(av)) != NULL) ! syslog(L_ERROR, "%s cant cancel %s %s", LogName, av[0], q); ! } ! /* Clear the work-in-progress entry. */ ! NCclearwip(wp); ! ! /* Reset input buffer to the default size; don't let realloc ! * be lazy. */ ! DISPOSE(bp->Data); ! bp->Size = START_BUFF_SIZE; ! bp->Used = 0; ! bp->Data = NEW(char, bp->Size); ! } ! else if (bp->Used > 8 * 1024) { ! /* Make some room; save the last few bytes of the article */ ! for (p = bp->Data, i = 0; i < SAVE_AMT; p++, i++) ! p[0] = p[bp->Used - SAVE_AMT + 0]; ! wp->Size += bp->Used - SAVE_AMT; ! bp->Used = SAVE_AMT; ! } ! break; } ! #ifdef STR_DEBUG ! syslog(L_NOTICE, "%s NCreader rest=%d Used=%d SaveUsed=%d", ! CHANname(cp), rest, bp->Used, SaveUsed); ! #endif STR_DEBUG ! if (rest > 0) { ! if (rest < SaveUsed) { /* more commands in buffer */ ! bp->Used = SaveUsed = SaveUsed - rest; ! /* It would be nice to avoid this copy but that ! ** would require changes to the bp structure and ! ** the way it is used. ! */ ! /* strncpy(bp->Data, &bp->Data[rest], bp->Used); */ ! (void)memcpy((POINTER)bp->Data, (POINTER)&bp->Data[rest], (SIZE_T)bp->Used); ! rest = bp->Used; ! } else { ! bp->Used = 0; ! break; ! } ! } else break; } } *************** *** 1055,1058 **** --- 1189,1279 ---- cp->BadCommands = 0; NCwritetext(cp, NCgreeting); return cp; + } + + + /* These modules support the streaming option to tranfer articles + ** faster. + */ + + /* + ** The "check" command. Check the Message-ID, and see if we want the + ** article or not. Stay in command state. + */ + STATIC FUNCTYPE + NCcheck(cp) + CHANNEL *cp; + { + register char *p; + int msglen; + WIP *who; + + if (AmSlave) { + NCwritetext(cp, NCbadcommand); + return; + } + + /* Snip off the Message-ID. */ + for (p = cp->In.Data; !ISWHITE(*p); p++) + continue; + for ( ; ISWHITE(*p); p++) + continue; + msglen = strlen(p) + 5; /* 3 digits + space + id + null */ + if (cp->Sendid.Size < msglen) { + if (cp->Sendid.Size > 0) DISPOSE(cp->Sendid.Data); + if (msglen > MAXHEADERSIZE) cp->Sendid.Size = msglen; + else cp->Sendid.Size = MAXHEADERSIZE; + cp->Sendid.Data = NEW(char, cp->Sendid.Size); + } + if (!ARTidok(p)) { + (void)sprintf(cp->Sendid.Data, "%d %s", NNTP_ERR_GOTID_VAL, p); + NCwritereply(cp, cp->Sendid.Data); + syslog(L_NOTICE, "%s bad_messageid %s", CHANname(cp), MaxLength(p, p)); + return; + } + + if (HIShavearticle(p)) { + cp->Refused++; + (void)sprintf(cp->Sendid.Data, "%d %s", NNTP_ERR_GOTID_VAL, p); + NCwritereply(cp, cp->Sendid.Data); + } else if (NCinprogress(cp, p, &who)) { + (void)sprintf(cp->Sendid.Data, "%d %s", NNTP_RESENDID_VAL, p); + NCwritereply(cp, cp->Sendid.Data); + } else { + (void)sprintf(cp->Sendid.Data, "%d %s", NNTP_OK_SENDID_VAL, p); + NCwritereply(cp, cp->Sendid.Data); + } + /* stay in command mode */ + } + + /* + ** The "takethis" command. Article follows. + ** Remember for later ack. + */ + STATIC FUNCTYPE + NCtakethis(cp) + CHANNEL *cp; + { + register char *p; + int msglen; + + /* Snip off the Message-ID. */ + for (p = cp->In.Data; !ISWHITE(*p); p++) + continue; + for ( ; ISWHITE(*p); p++) + continue; + if (!ARTidok(p)) { + syslog(L_NOTICE, "%s bad_messageid %s", CHANname(cp), MaxLength(p, p)); + } + msglen = strlen(p) + 5; /* 3 digits + space + id + null */ + if (cp->Sendid.Size < msglen) { + if (cp->Sendid.Size > 0) DISPOSE(cp->Sendid.Data); + if (msglen > MAXHEADERSIZE) cp->Sendid.Size = msglen; + else cp->Sendid.Size = MAXHEADERSIZE; + cp->Sendid.Data = NEW(char, cp->Sendid.Size); + } + /* save ID for later NACK or ACK */ + (void)sprintf(cp->Sendid.Data, "%d %s", NNTP_ERR_FAILID_VAL, p); + + cp->State = CSgetarticle; } *** include/nntp.h.orig Wed Jul 27 21:50:02 1994 --- include/nntp.h Fri Jan 6 19:07:59 1995 *************** *** 83,88 **** --- 83,105 ---- #define NNTP_CANTPOST "440 Posting not allowed" #define NNTP_CANTPOST_VAL 440 + /* new entries for the "streaming" protocol */ + /* response to "mode stream" else 500 if stream not supported */ + #define NNTP_OK_STREAM_VAL 203 /* Streaming supported */ + + /* response to "check ". Must include ID of article. + ** Example: "431 <1234@host.domain>" + */ + #define NNTP_OK_SENDID_VAL 238 /* I want article */ + #define NNTP_RESENDID_VAL 431 /* try again later */ + #define NNTP_ERR_GOTID_VAL 438 /* Got , don't send */ + + /* responses to "takethis . Must include ID of article */ + #define NNTP_OK_RECID_VAL 239 /* Article received OK */ + #define NNTP_ERR_FAILID_VAL 439 /* Transfer of failed */ + + /* End of new entries for the "streaming" protocol */ + /* ** The first character of an NNTP reply can be used as a category class. ===END PATCH===