From b5f3f48f0d7973ef32ba58265367b508eff9e4b8 Mon Sep 17 00:00:00 2001 From: Sergey Dorofeev Date: Sat, 4 Feb 2012 18:20:48 +0400 Subject: [PATCH] netspool send --- source/bforce/netspool.c | 92 +++++++++++++++++++++--------------- source/bforce/prot_common.c | 93 ++++++++++++++++++++++++++++++------- 2 files changed, 130 insertions(+), 55 deletions(-) diff --git a/source/bforce/netspool.c b/source/bforce/netspool.c index cf3a085..ec3e4b4 100644 --- a/source/bforce/netspool.c +++ b/source/bforce/netspool.c @@ -188,62 +188,80 @@ void netspool_receive(s_netspool_state *state) } +int netspool_read(s_netspool_state *state, void *buf, int buflen) +{ + int n; + if( state->length == 0 ) { + puts("everithing is read"); + return 0; + } + + n = recv(state->socket, buf, state->length>buflen? buflen: state->length, 0); + + if(n==0) { + state->state = NS_ERROR; + state->error = "remote socket shutdown"; + return -1; + } + + if(n==-1) { + puts("error reading data"); + printf("%d %s\n", errno, strerror(errno)); + state->state = NS_ERROR; + state->error = "IO error"; + return -1; + } + + state->length -= n; + return n; +} + +void netspool_acknowledge(s_netspool_state *state) +{ + char strbuf[STRBUF]; + int r; + + if( state->length > 0 ) { + state->state = NS_ERROR; + state->error = "Too early acknowledgement"; + return; + } + + snprintf(strbuf, STRBUF, "DONE %s", state->filename); + r = sendstr(state->socket, strbuf); + if( r ) { state->state = NS_ERROR; state->error = "IO error"; return; } + + state->state = NS_RECEIVING; + state->filename[0] = 0; +} + +void netspool_end(s_netspool_state *state) +{ + sendstr(state->socket, "END satisfied"); + close(state->socket); + state->state = NS_NOTINIT; +} + /* void savefile(const char *fn, unsigned long long l, int s) { char BUF[STRBUF]; int n, n1; int f; - f=open(fn, O_CREAT|O_EXCL|O_WRONLY, 0666); + f=open(fn, O_CREAT|O_EXCL|O_WRONLY, 0664); if(f==-1) { puts("error open file"); printf("%d %s\n", errno, strerror(errno)); exit(-3); } while(l) { - n=recv(s, BUF, l>STRBUF?STRBUF:l, 0); - if(n==0){ - puts("remote socket shutdown"); - exit(-3); - } - if(n==-1){ - puts("error reading data"); - printf("%d %s\n", errno, strerror(errno)); - exit(-3); - } n1 = write(f, BUF, n); if(n1!=n) { puts("error writing file"); printf("%d %s\n", errno, strerror(errno)); exit(-3); } - l-=n; } close(f); } - - -void main() { - char filename[STRBUF]; - unsigned long long length; - - - - filename[0]=0; - while(1) { - savefile(filename, length, s); - sprintf(strbuf, "DONE %s", filename); - sendstr(s, strbuf); - filename[0]=0; - continue; - } - - puts("!!!"); - exit(-1); - } - - sendstr(s, "END satisfied"); - close(s); - -} */ diff --git a/source/bforce/prot_common.c b/source/bforce/prot_common.c index ba9237d..529ac5e 100644 --- a/source/bforce/prot_common.c +++ b/source/bforce/prot_common.c @@ -199,36 +199,43 @@ int p_tx_fopen(s_protinfo *pi) return 1; get_next_file: - if( prot_get_next_file(&ptrl, pi) || !ptrl ) + if( prot_get_next_file(&ptrl, pi) ) return 1; + + if( ptrl ) { - /* Mark this file as "processed" */ - ptrl->status = STATUS_SENDING; + /* Mark this file as "processed" */ + ptrl->status = STATUS_SENDING; - pi->send_left_num -= 1; - pi->send_left_size -= ptrl->size; + pi->send_left_num -= 1; + pi->send_left_size -= ptrl->size; - if( pi->send_left_size < 0 ) + if( pi->send_left_size < 0 ) pi->send_left_size = 0; - if( pi->send_left_num < 0 ) + if( pi->send_left_num < 0 ) pi->send_left_num = 0; + + DEB((D_PROT, "p_tx_fopen: now opening \"%s\"", ptrl->fname)); - /* Reset MinCPS time counter */ - pi->tx_low_cps_time = 0; - - DEB((D_PROT, "p_tx_fopen: now opening \"%s\"", ptrl->fname)); - - if( stat(ptrl->fname, &st) ) - { + if( stat(ptrl->fname, &st) ) { logerr("send: cannot stat file \"%s\"", ptrl->fname); goto get_next_file; - } + } - if( (fp = file_open(ptrl->fname, "r")) == NULL ) - { + if( (fp = file_open(ptrl->fname, "r")) == NULL ) { logerr("send: cannot open file \"%s\"", ptrl->fname); goto get_next_file; + } + + } +#ifndef NETSPOOL + else { + return 1; } +#endif + + /* Reset MinCPS time counter */ + pi->tx_low_cps_time = 0; /* * Add new entry to the send files queue @@ -250,6 +257,25 @@ get_next_file: /* * Set file information */ +#ifdef NETSPOOL + if( !ptrl ) { + /* send file received through netspool */ + pi->send->fp = 0; + pi->send->local_name = "NETSPOOL"; + pi->send->type = out_filetype(state.netspool.filename); + pi->send->net_name = recode_file_out(p_convfilename(state.netspool.filename, pi->send->type)); + pi->send->fname = NULL; + pi->send->mod_time = time(NULL); + pi->send->mode = 0664; + pi->send->bytes_total = state.netspool.length; + pi->send->start_time = time(NULL); + pi->send->eofseen = FALSE; + pi->send->status = FSTAT_PROCESS; + pi->send->action = ACTION_ACKNOWLEDGE; + pi->send->flodsc = -1; + } else { + +#endif pi->send->fp = fp; pi->send->local_name = (char*)xstrcpy(file_getname(ptrl->fname)); pi->send->net_name = recode_file_out(p_convfilename(pi->send->local_name, ptrl->type)); @@ -265,7 +291,11 @@ get_next_file: pi->send->type = ptrl->type; pi->send->action = ptrl->action; pi->send->flodsc = ptrl->flodsc; - + +#idef NETSPOOL + } +#endif + if( strcmp(pi->send->local_name, pi->send->net_name) == 0 ) { log("send: \"%s\" %d bytes", @@ -404,6 +434,11 @@ int p_tx_fclose(s_protinfo *pi) else if( errno != ENOENT ) logerr("send: cannot truncate file \"%s\"", pi->send->fname); break; +#ifdef NETSPOOL + case ACTION_ACKNOWLEDE: + netspool_acknowlede(&state.netspool); + break; +#endif } } @@ -429,6 +464,24 @@ int p_tx_readfile(char *buffer, size_t buflen, s_protinfo *pi) struct stat st; long ftell_pos; +#ifdef NETSPOOL + if(pi->send->fname==NULL && strcmp(pi->send->localname, "NETSPOOL")==0 ) { + if( state->state != NS_RECVFILE ) { + log("send: wrong netspool state"); + pi->send->status = FSTAT_SKIPPED; + return -2; + } + n = netspool_read(&state.netspool, buffer, buflen); + pi->send->eofseen = state.netspool.length == 0; + if( n==-1 ) { + log("send: netspool error"); + log(netspool->state->error); + pi->send->status = FSTAT_SKIPPED; + return -2; + } + return n; + } +#endif /* * Sanity check: read from closed file. */ @@ -1223,6 +1276,10 @@ void p_session_cleanup(s_protinfo *pi, bool success) logerr("cannot unlink temporary file \"%s\"", ptrl->fname); } } + +#ifdef NETSPOOL + netspool_end(&state.netspool); +#endif } /*****************************************************************************