Logo Search packages:      
Sourcecode: udpcast version File versions  Download package

senddata.c

#include <sys/types.h>
#include <assert.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <sys/time.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>

#include "fec.h"
#include "log.h"
#include "util.h"
#include "socklib.h"
#include "fifo.h"
#include "produconsum.h"
#include "udpcast.h"
#include "udp-sender.h"
#include "udpc-protoc.h"
#include "rate-limit.h"
#include "statistics.h"

#define DEBUG 0

typedef struct slice {
    int base; /* base address of slice in buffer */
    int sliceNo;
    int bytes; /* bytes in slice */
    int nextBlock; /* index of next buffer to be transmitted */
    volatile enum { 
      SLICE_FREE, /* free slice, and in the queue of free slices */
      SLICE_NEW, /* newly allocated. FEC calculation and first 
                * transmission */
      SLICE_XMITTED, /* transmitted */
      SLICE_ACKED, /* acknowledged (if applicable) */
      SLICE_PRE_FREE /* no longer used, but not returned to queue */
    } state;
    char rxmitMap[MAX_SLICE_SIZE / BITS_PER_CHAR]; 
    /* blocks to be retransmitted */

    char isXmittedMap[MAX_SLICE_SIZE / BITS_PER_CHAR]; 
   /* blocks which have already been retransmitted during this round*/

    int rxmitId; /* used to distinguish among several retransmission 
              * requests, so that we can easily discard answers to "old"
              * requests */

    /* This structure is used to keep track of clients who answered, and
     * to make the reqack message
     */
00050     struct reqackBm {
      struct reqack ra;
      char readySet[MAX_CLIENTS / BITS_PER_CHAR]; /* who is already ok? */
    } sl_reqack;

    char answeredSet[MAX_CLIENTS / BITS_PER_CHAR]; /* who answered at all? */

    int nrReady; /* number of participants who are ready */
    int nrAnswered; /* number of participants who answered; */
    int needRxmit; /* does this need retransmission? */
    int lastGoodBlock; /* last good block of slice (i.e. last block having not
                  * needed retransmission */

    int lastReqack; /* last req ack sent (debug) */
#ifdef BB_FEATURE_UDPCAST_FEC
    unsigned char *fec_data;
#endif
} *slice_t;

#define QUEUE_SIZE 256

00071 struct returnChannel {
    pthread_t thread; /* message receiving thread */
    int socket; /* socket on which we receive the messages */
    produconsum_t incoming; /* where to enqueue incoming messages */
    produconsum_t free; /* free space */
    struct {
      int clNo; /* client number */
      union message msg; /* its message */
    } q[QUEUE_SIZE];
    struct net_config *config;
    participantsDb_t participantsDb;
    int stopIt; /* if set, return channel should stop */
};

#define NR_SLICES 2

00087 typedef struct senderState {
    struct returnChannel rc;
    struct fifo *fifo;

    struct net_config *config;
    sender_stats_t stats;
    int socket;
    int endianness;
    
    struct slice slices[NR_SLICES];

    produconsum_t free_slices_pc;

    unsigned char *fec_data;
    pthread_t fec_thread;
    produconsum_t fec_data_pc;
} *sender_state_t;


static int getSliceBlocks(struct slice *slice, struct net_config *net_config)
{
    return (slice->bytes + net_config->blockSize - 1) / net_config->blockSize;
}

static int isSliceAcked(struct slice *slice)
{
#if DEBUG
    flprintf("Is slice %d acked?  %d\n", slice->sliceNo,
           slice->state);
#endif
    if(slice->state == SLICE_ACKED) {
      return 1;
    } else {
      return 0;
    }
}

static int isSliceXmitted(struct slice *slice) {
    if(slice->state == SLICE_XMITTED) {
      return 1;
    } else {
      return 0;
    }
}


static int freeSlice(sender_state_t sendst, struct slice *slice) {
    int i;
    i = slice - sendst->slices;
#if DEBUG
    flprintf("Freeing slice %p %d %d\n", slice, slice->sliceNo, i);
#endif
    slice->state = SLICE_PRE_FREE;
    while(1) {
      int pos = pc_getProducerPosition(sendst->free_slices_pc);
      if(sendst->slices[pos].state == SLICE_PRE_FREE)
          sendst->slices[pos].state = SLICE_FREE;
      else
          break;
      pc_produce(sendst->free_slices_pc, 1);
    }
    return 0;
}

static struct slice *makeSlice(sender_state_t sendst, int sliceNo) {
    struct net_config *config = sendst->config;
    struct fifo *fifo = sendst->fifo;
    int i;
    struct slice *slice=NULL;

    pc_consume(sendst->free_slices_pc, 1);
    i = pc_getConsumerPosition(sendst->free_slices_pc);
    slice = &sendst->slices[i];
    assert(slice->state == SLICE_FREE);
    BZERO(*slice);
    pc_consumed(sendst->free_slices_pc, 1);

    slice->base = pc_getConsumerPosition(sendst->fifo->data);
    slice->sliceNo = sliceNo;
    slice->bytes = pc_consume(sendst->fifo->data, 10*config->blockSize);

    /* fixme: use current slice size here */
    if(slice->bytes > config->blockSize * config->sliceSize)
      slice->bytes = config->blockSize * config->sliceSize;

    if(slice->bytes > config->blockSize)
      slice->bytes -= slice->bytes % config->blockSize;
    pc_consumed(fifo->data, slice->bytes);
    slice->nextBlock = 0;
    slice->state = SLICE_NEW;
#if 0
    flprintf("Made slice %p %d\n", slice, slice->sliceNo);
#endif
    BZERO(slice->sl_reqack.readySet);
    slice->nrReady = 0;
#ifdef BB_FEATURE_UDPCAST_FEC
    slice->fec_data = sendst->fec_data + (i * config->fec_stripes * 
                                config->fec_redundancy *
                                config->blockSize);
#endif
    return slice;
}


static int sendRawData(int sock,
                   struct net_config *config, 
                   char *header, int headerSize, 
                   char *data, int dataSize)
{
    struct iovec iov[2];
    struct msghdr hdr;
    int packetSize;
    int ret;
    
    iov[0].iov_base = header;
    iov[0].iov_len = headerSize;

    iov[1].iov_base = data;
    iov[1].iov_len = dataSize;

    hdr.msg_name = &config->dataMcastAddr;
    hdr.msg_namelen = sizeof(struct sockaddr);
    hdr.msg_iov = iov;
    hdr.msg_iovlen  = 2;
#ifndef __CYGWIN__
    hdr.msg_control = 0;
    hdr.msg_controllen = 0;
    hdr.msg_flags = 0;
#endif

    packetSize = dataSize + headerSize;
    doRateLimit(config->rateLimit, packetSize);
#ifndef __CYGWIN__
    if(config->flags & FLAG_AUTORATE)
      doAutoRateLimit(sock, config->dir, config->sendbuf, packetSize);
#endif
    ret = sendmsg(sock, &hdr, 0);
    if (ret < 0) {
      char ipBuffer[16];
      udpc_fatal(1, "Could not broadcast data packet to %s:%d (%s)\n",
               getIpString(&config->dataMcastAddr, ipBuffer),
               getPort(&config->dataMcastAddr),
               strerror(errno));
    }

    return 0;
}


static int transmitDataBlock(sender_state_t sendst, struct slice *slice, int i)
{
    struct fifo *fifo = sendst->fifo;
    struct net_config *config = sendst->config;
    struct dataBlock msg;

    assert(i < MAX_SLICE_SIZE);
    
    switch(sendst->endianness) {
      case NET_ENDIAN:
          msg.opCode  = htons(CMD_DATA);
          msg.sliceNo = htonl(slice->sliceNo);
          msg.blockNo = htons(i);
          break;
      case PC_ENDIAN:
          msg.opCode  = htopcs(CMD_DATA);
          msg.sliceNo = htopcl(slice->sliceNo);
          msg.blockNo = htopcs(i);
          break;
    }

    msg.reserved = 0;
    msg.reserved2 = 0;
    msg.bytes = htonl(slice->bytes);
    
    sendRawData(sendst->socket, config, 
            (char *) &msg, sizeof(msg),
            fifo->dataBuffer + 
            (slice->base + i * config->blockSize) % fifo->dataBufSize,
            config->blockSize);
    return 0;
}

#ifdef BB_FEATURE_UDPCAST_FEC
static int transmitFecBlock(sender_state_t sendst, struct slice *slice, int i)
{
    struct net_config *config = sendst->config;
    struct fecBlock msg;

    /* Do not transmit zero byte FEC blocks if we are not in async mode */
    if(slice->bytes == 0 && !(config->flags & FLAG_ASYNC))
      return 0;
      
    assert(i < config->fec_redundancy * config->fec_stripes);
    
    msg.opCode  = htons(CMD_FEC);
    msg.stripes = htons(config->fec_stripes);
    msg.sliceNo = htonl(slice->sliceNo);
    msg.blockNo = htons(i);
    msg.reserved2 = 0;
    msg.bytes = htonl(slice->bytes);    
    sendRawData(sendst->socket, sendst->config, 
            (char *) &msg, sizeof(msg),
            (slice->fec_data + i * config->blockSize), config->blockSize);
    return 0;
}
#endif

static int sendSlice(sender_state_t sendst, struct slice *slice,
                 int retransmitting)
{    
    struct net_config *config = sendst->config;

    int nrBlocks, i;
#ifdef BB_FEATURE_UDPCAST_FEC
    int fecBlocks;
#endif
    int retransmissions=0;

    if(retransmitting) {
      slice->nextBlock = 0;
      if(slice->state != SLICE_XMITTED)
          return 0;
    } else {
      if(slice->state != SLICE_NEW)
          return 0;
    }

    nrBlocks = getSliceBlocks(slice, config);
#ifdef BB_FEATURE_UDPCAST_FEC
    if((config->flags & FLAG_FEC) && !retransmitting) {
      fecBlocks = config->fec_redundancy * config->fec_stripes;
    } else {
      fecBlocks = 0;
    }
#endif

#if DEBUG
    if(retransmitting) {
      flprintf("%s slice %d from %d to %d (%d bytes) %d\n",
            retransmitting ? "Retransmitting" : "Sending", 
            slice->sliceNo, slice->nextBlock, nrBlocks, slice->bytes,
            config->blockSize);
    }
#endif

    /* transmit the data */
    for(i = slice->nextBlock; i < nrBlocks
#ifdef BB_FEATURE_UDPCAST_FEC
        + fecBlocks
#endif
        ; i++) {
      if(retransmitting) {
          if(!BIT_ISSET(i, slice->rxmitMap) ||
             BIT_ISSET(i, slice->isXmittedMap)) {
            /* if slice is not in retransmit list, or has _already_
             * been retransmitted, skip it */
            if(i > slice->lastGoodBlock)
                slice->lastGoodBlock = i;
            continue;
          }
          SET_BIT(i, slice->isXmittedMap);
          retransmissions++;
#if DEBUG
          flprintf("Retransmitting %d.%d\n", slice->sliceNo, i);
#endif
      }
      if(i < nrBlocks)
          transmitDataBlock(sendst, slice, i);
#ifdef BB_FEATURE_UDPCAST_FEC
      else
          transmitFecBlock(sendst, slice, i - nrBlocks);
#endif
      if(!retransmitting && pc_getWaiting(sendst->rc.incoming)) {
          i++;
          break;
      }
    }

    if(retransmissions)
      senderStatsAddRetransmissions(sendst->stats, retransmissions);
    slice->nextBlock = i;
    if(i == nrBlocks
#ifdef BB_FEATURE_UDPCAST_FEC
       + fecBlocks
#endif
       ) {
      slice->needRxmit = 0;
      if(!retransmitting)
          slice->state = SLICE_XMITTED;
#if DEBUG
      flprintf("Done: at block %d %d %d\n", i, retransmitting,
             slice->state);
#endif
      return 2;
    }
#if DEBUG
    flprintf("Done: at block %d %d %d\n", i, retransmitting,
           slice->state);
#endif
    return 1;
}

static int ackSlice(struct slice *slice, struct net_config *net_config,
                struct fifo *fifo, sender_stats_t stats)
{
    if(slice->state == SLICE_ACKED)
      /* already acked */
      return 0;
    if(!(net_config->flags & FLAG_SN)) {
      if(net_config->discovery == DSC_DOUBLING) {
            net_config->sliceSize += net_config->sliceSize / 4;
         if(net_config->sliceSize >= net_config->max_slice_size) {
             net_config->sliceSize = net_config->max_slice_size;
             net_config->discovery = DSC_REDUCING;
         }
         udpc_logprintf(udpc_log, "Doubling slice size to %d\n", 
                    net_config->sliceSize);
       }
    }
    slice->state = SLICE_ACKED;
    pc_produce(fifo->freeMemQueue, slice->bytes);

    /* Statistics */
    senderStatsAddBytes(stats, slice->bytes);

    if(slice->bytes) {
      displaySenderStats(stats, 
                     net_config->blockSize, net_config->sliceSize);
    }
    /* End Statistics */

    return 0;
}


static int sendReqack(struct slice *slice, struct net_config *net_config,
                  struct fifo *fifo, sender_stats_t stats,
                  int sock,  int endianness)
{
    /* in async mode, just confirm slice... */
    if((net_config->flags & FLAG_ASYNC) && slice->bytes != 0) {
      ackSlice(slice, net_config, fifo, stats);
      return 0;
    }

    if((net_config->flags & FLAG_ASYNC)
#ifdef BB_FEATURE_UDPCAST_FEC
       && 
       (net_config->flags & FLAG_FEC)
#endif
       ) {
      return 0;
    }

    if(!(net_config->flags & FLAG_SN) && slice->rxmitId != 0) {
      int nrBlocks;
      nrBlocks = getSliceBlocks(slice, net_config);
#if DEBUG
      flprintf("nrBlocks=%d lastGoodBlock=%d\n",
             nrBlocks, slice->lastGoodBlock);
#endif
      if(slice->lastGoodBlock != 0 && slice->lastGoodBlock < nrBlocks) {
          net_config->discovery = DSC_REDUCING;
          if (slice->lastGoodBlock < net_config->sliceSize / 2) {
            net_config->sliceSize = net_config->sliceSize / 2;
          } else {
            net_config->sliceSize = slice->lastGoodBlock;
          }
          if(net_config->sliceSize < 32) {
            /* a minimum of 32 */
            net_config->sliceSize = 32;
          }
          udpc_logprintf(udpc_log, "Slice size=%d\n", net_config->sliceSize);
      }
    }

    slice->lastGoodBlock = 0;
#if DEBUG
    flprintf("Send reqack %d.%d\n", slice->sliceNo, slice->rxmitId);
#endif
    switch(endianness) {
      case PC_ENDIAN:
          slice->sl_reqack.ra.opCode = htopcs(CMD_REQACK);
          slice->sl_reqack.ra.sliceNo = htopcl(slice->sliceNo);
          slice->sl_reqack.ra.bytes = htopcl(slice->bytes);
          break;
      case NET_ENDIAN:
          slice->sl_reqack.ra.opCode = htons(CMD_REQACK);
          slice->sl_reqack.ra.sliceNo = htonl(slice->sliceNo);
          slice->sl_reqack.ra.bytes = htonl(slice->bytes);
          break;
    }
    slice->sl_reqack.ra.reserved = 0;
    bcopy((void*)&slice->sl_reqack.readySet,(void*)&slice->answeredSet,
        sizeof(slice->answeredSet));
    slice->nrAnswered = slice->nrReady;

    /* not everybody is ready yet */
    slice->needRxmit = 0;
    bzero(slice->rxmitMap, sizeof(slice->rxmitMap));
    bzero(slice->isXmittedMap, sizeof(slice->isXmittedMap));
    switch(endianness) {
      case PC_ENDIAN:
          slice->sl_reqack.ra.rxmit = htopcl(slice->rxmitId);
          break;
      case NET_ENDIAN:
          slice->sl_reqack.ra.rxmit = htonl(slice->rxmitId);
          break;
    }
    
    doRateLimit(net_config->rateLimit, sizeof(slice->sl_reqack));
#ifndef __CYGWIN__
    if(net_config->flags & FLAG_AUTORATE)
      doAutoRateLimit(sock, net_config->dir, net_config->sendbuf,
                  sizeof(slice->sl_reqack));
#endif
#if DEBUG
    flprintf("sending reqack for slice %d\n", slice->sliceNo);
#endif
    BCAST_DATA(sock, slice->sl_reqack);
    return 0;
}

/**
 * mark slice as acknowledged, and work on slice size
 */

static int doRetransmissions(sender_state_t sendst,
                       struct slice *slice)
{
    if(slice->state == SLICE_ACKED)
      return 0; /* nothing to do */

#if DEBUG
    flprintf("Do retransmissions\n");
#endif
    /* FIXME: reduce slice size if needed */
    if(slice->needRxmit) {
      /* do some retransmissions */
      sendSlice(sendst, slice, 1);
    }
    return 0;
}


static void markParticipantAnswered(slice_t slice, int clNo)
{
    if(BIT_ISSET(clNo, slice->answeredSet))
      /* client already has answered */
      return;
    slice->nrAnswered++;
    SET_BIT(clNo, slice->answeredSet);
}

/**
 * Handles ok message
 */
static int handleOk(sender_state_t sendst,
                struct slice *slice,
                int clNo)
{
    if(slice == NULL)
      return 0;
    if(!udpc_isParticipantValid(sendst->rc.participantsDb, clNo)) {
      udpc_flprintf("Invalid participant %d\n", clNo);
      return 0;
    }
    if (BIT_ISSET(clNo, slice->sl_reqack.readySet)) {
        /* client is already marked ready */
#if DEBUG
      flprintf("client %d is already ready\n", clNo);
#endif
    } else {
      SET_BIT(clNo, slice->sl_reqack.readySet);
      slice->nrReady++;
#if DEBUG
      flprintf("client %d replied ok for %p %d ready=%d\n", clNo, 
            slice, slice->sliceNo, slice->nrReady);
#endif      
      senderSetAnswered(sendst->stats, clNo);
      markParticipantAnswered(slice, clNo);
    }
    return 0;
}

static int handleRetransmit(sender_state_t sendst,
                      struct slice *slice,
                      int clNo, char *map, int rxmit)
{
    unsigned int i;

#if DEBUG
    flprintf("Handle retransmit %d @%d\n", slice->sliceNo, clNo);
#endif

    if(!udpc_isParticipantValid(sendst->rc.participantsDb, clNo)) {
      udpc_flprintf("Invalid participant %d\n", clNo);
      return 0;
    }
    if(slice == NULL)
      return 0;    
    if (rxmit < slice->rxmitId) {
#if 0
      flprintf("Late answer\n");
#endif
      /* late answer to previous Req Ack */
      return 0;
    }
#if DEBUG
    logprintf(udpc_log,
            "Received retransmit request for slice %d from client %d\n",
            slice->sliceNo,clNo);
#endif
    for(i=0; i <sizeof(slice->rxmitMap) / sizeof(char); i++) {
      slice->rxmitMap[i] |= ~map[i];
    }
    slice->needRxmit = 1;
    markParticipantAnswered(slice, clNo);
    return 0;
}

static int handleDisconnect1(participantsDb_t db,
                       struct slice *slice,
                       int clNo)
{    
    if(slice != NULL) {
      if (BIT_ISSET(clNo, slice->sl_reqack.readySet)) {
          /* avoid counting client both as left and ready */
          CLR_BIT(clNo, slice->sl_reqack.readySet);
          slice->nrReady--;
      }
      if (BIT_ISSET(clNo, slice->answeredSet)) {
          slice->nrAnswered--;
          CLR_BIT(clNo, slice->answeredSet);
      }
    }
    return 0;
}

static int handleDisconnect(participantsDb_t db,
                      struct slice *slice1,
                      struct slice *slice2,
                      int clNo)
{
    handleDisconnect1(db, slice1, clNo);
    handleDisconnect1(db, slice2, clNo);
    udpc_removeParticipant(db, clNo);
    if(udpc_nrParticipants(db) == 0)
      exit(0);
    return 0;
}

static struct slice *findSlice(struct slice *slice1,
                         struct slice *slice2,
                         int sliceNo)
{
    if(slice1 != NULL && slice1->sliceNo == sliceNo)
      return slice1;
    if(slice2 != NULL && slice2->sliceNo == sliceNo)
      return slice2;
    return NULL;
}

static int handleNextMessage(sender_state_t sendst,
                       struct slice *xmitSlice,
                       struct slice *rexmitSlice)
{
    int pos = pc_getConsumerPosition(sendst->rc.incoming);
    union message *msg = &sendst->rc.q[pos].msg;
    int clNo = sendst->rc.q[pos].clNo;
    int endianness = sendst->endianness;

#if DEBUG
    flprintf("handle next message\n");
#endif

    pc_consumeAny(sendst->rc.incoming);
    switch(xtohs(msg->opCode)) {
      case CMD_OK:
          handleOk(sendst, 
                 findSlice(xmitSlice, rexmitSlice, xtohl(msg->ok.sliceNo)),
                 clNo);
          break;
      case CMD_DISCONNECT:
          handleDisconnect(sendst->rc.participantsDb, 
                       xmitSlice, rexmitSlice, clNo);
          break;      
      case CMD_RETRANSMIT:
#if DEBUG
          flprintf("Received retransmittal request for %ld from %d:\n",
                 (long) xtohl(msg->retransmit.sliceNo), clNo);
#endif
          handleRetransmit(sendst,
                       findSlice(xmitSlice, rexmitSlice,
                               xtohl(msg->retransmit.sliceNo)),
                       clNo,
                       msg->retransmit.map,
                       msg->retransmit.rxmit);
          break;
      default:
          udpc_flprintf("Bad command %04x\n", 
                    (unsigned short) msg->opCode);
          break;
    }
    pc_consumed(sendst->rc.incoming, 1);
    pc_produce(sendst->rc.free, 1);
    return 0;
}

static void *returnChannelMain(void *args) {
    struct returnChannel *returnChannel = (struct returnChannel *) args;
    fd_set read_set;
    int r=0;
    FD_ZERO(&read_set);

    while(1) {
      struct sockaddr from;
      int clNo;
      int pos = pc_getConsumerPosition(returnChannel->free);
      pc_consumeAny(returnChannel->free);
      do {
          struct timeval tv;
          if(returnChannel->stopIt)
            return NULL;
          FD_SET(returnChannel->socket,&read_set);
          tv.tv_sec=0;
          tv.tv_usec=100;
          r = select(returnChannel->socket+1,
                   &read_set, NULL, NULL, &tv);
      } while(r==0);

      RECV(returnChannel->socket, 
           returnChannel->q[pos].msg, from,
           returnChannel->config->portBase);
      clNo = udpc_lookupParticipant(returnChannel->participantsDb, &from);
      if (clNo < 0) { 
          /* packet from unknown provenance */
          continue;
      }
      returnChannel->q[pos].clNo = clNo;
      pc_consumed(returnChannel->free, 1);
      pc_produce(returnChannel->incoming, 1);
    }
}


static void initReturnChannel(struct returnChannel *returnChannel,
                        struct net_config *config,
                        int sock) {
    returnChannel->config = config;
    returnChannel->stopIt = 0;
    returnChannel->socket = sock;
    returnChannel->free = pc_makeProduconsum(QUEUE_SIZE,"msg:free-queue");
    pc_produce(returnChannel->free, QUEUE_SIZE);
    returnChannel->incoming = pc_makeProduconsum(QUEUE_SIZE,"msg:incoming");

    pthread_create(&returnChannel->thread, NULL,
               returnChannelMain, returnChannel);

}

static void cancelReturnChannel(struct returnChannel *returnChannel) {
    returnChannel->stopIt = 1;
    pthread_join(returnChannel->thread, NULL);
}

static void *netSenderMain(void     *args0)
{
    sender_state_t sendst = (sender_state_t) args0;
    struct net_config *config = sendst->config;
    struct timeval tv;
    struct timespec ts;
    int atEnd = 0;
    int firstWait=1;
    unsigned long waitAverage=10000; /* Exponential average of last wait times */

    struct slice *xmitSlice=NULL; /* slice being transmitted a first time */
    struct slice *rexmitSlice=NULL; /* slice being re-transmitted */
    int sliceNo = 0;

    /* transmit the data */
    if(config->default_slice_size == 0) {
#ifdef BB_FEATURE_UDPCAST_FEC
      if(config->flags & FLAG_FEC) {
          config->sliceSize = 
            config->fec_stripesize * config->fec_stripes;
      } else
#endif
        if(config->flags & FLAG_ASYNC)
          config->sliceSize = 1024;
      else if (sendst->config->flags & FLAG_SN) {
          sendst->config->sliceSize = 112;
      } else
          sendst->config->sliceSize = 130;
      sendst->config->discovery = DSC_DOUBLING;
    } else {
      config->sliceSize = config->default_slice_size;
#ifdef BB_FEATURE_UDPCAST_FEC
      if((config->flags & FLAG_FEC) &&
         (config->sliceSize > 128 * config->fec_stripes))
          config->sliceSize = 128 * config->fec_stripes;
#endif
    }
    if(config->sliceSize > MAX_SLICE_SIZE)
      config->sliceSize = MAX_SLICE_SIZE;

    assert(config->sliceSize <= MAX_SLICE_SIZE);

    do {
      /* first, cleanup rexmit Slice if needed */

      if(rexmitSlice != NULL) {
          if(rexmitSlice->nrReady == 
             udpc_nrParticipants(sendst->rc.participantsDb)){
#if DEBUG
            flprintf("slice is ready\n");
#endif
            ackSlice(rexmitSlice, sendst->config, sendst->fifo, 
                   sendst->stats);
          }
          if(isSliceAcked(rexmitSlice)) {
            freeSlice(sendst, rexmitSlice);
            rexmitSlice = NULL;
          }
      }

      /* then shift xmit slice to rexmit slot, if possible */
      if(rexmitSlice == NULL &&  xmitSlice != NULL && 
         isSliceXmitted(xmitSlice)) {
          rexmitSlice = xmitSlice;
          xmitSlice = NULL;
          sendReqack(rexmitSlice, sendst->config, sendst->fifo, sendst->stats,
                   sendst->socket, sendst->endianness);
      }

      /* handle any messages */
      if(pc_getWaiting(sendst->rc.incoming)) {
#if DEBUG
          flprintf("Before message %d\n",
                pc_getWaiting(sendst->rc.incoming));
#endif
          handleNextMessage(sendst, xmitSlice, rexmitSlice);

          /* restart at beginning of loop: we may have acked the rxmit
           * slice, makeing it possible to shift the pipe */
          continue;
      }

      /* do any needed retransmissions */
      if(rexmitSlice != NULL && rexmitSlice->needRxmit) {
          doRetransmissions(sendst, rexmitSlice);
          /* restart at beginning: new messages may have arrived during
           * retransmission  */
          continue;
      }

      /* if all participants answered, send req ack */
      if(rexmitSlice != NULL && 
         rexmitSlice->nrAnswered == 
         udpc_nrParticipants(sendst->rc.participantsDb)) {
          rexmitSlice->rxmitId++;
          sendReqack(rexmitSlice, sendst->config, sendst->fifo, sendst->stats,
                   sendst->socket, sendst->endianness);
      }

      if(xmitSlice == NULL && !atEnd) {
#if DEBUG
          flprintf("SN=%d\n", sendst->config->flags & FLAG_SN);
#endif
          if((sendst->config->flags & FLAG_SN) ||
             rexmitSlice == NULL) {
#ifdef BB_FEATURE_UDPCAST_FEC
            if(sendst->config->flags & FLAG_FEC) {
                int i;
                pc_consume(sendst->fec_data_pc, 1);
                i = pc_getConsumerPosition(sendst->fec_data_pc);
                xmitSlice = &sendst->slices[i];
                pc_consumed(sendst->fec_data_pc, 1);
            } else
#endif
              {
                xmitSlice = makeSlice(sendst, sliceNo++);
            }
            if(xmitSlice->bytes == 0)
                atEnd = 1;
          }
      }
       
      if(xmitSlice != NULL && xmitSlice->state == SLICE_NEW) {
          sendSlice(sendst, xmitSlice, 0);
#if DEBUG
          flprintf("%d Interrupted at %d/%d\n", xmitSlice->sliceNo, 
                 xmitSlice->nextBlock, 
                 getSliceBlocks(xmitSlice, sendst->config));
#endif
          continue;
      }
      if(atEnd && rexmitSlice == NULL && xmitSlice == NULL)
          break;

      if(sendst->config->flags & FLAG_ASYNC)
          break;

#if DEBUG
      flprintf("Waiting for timeout...\n");
#endif
      gettimeofday(&tv, 0);
      ts.tv_sec = tv.tv_sec;
      ts.tv_nsec = (tv.tv_usec + 1.1*waitAverage) * 1000;

#ifdef __CYGWIN__
      /* Windows has a granularity of 1 millisecond in its timer. Take this
       * into account here */
      #define GRANULARITY 1000000
      ts.tv_nsec += 3*GRANULARITY/2;
      ts.tv_nsec -= ts.tv_nsec % GRANULARITY;
#endif

#define BILLION 1000000000

      while(ts.tv_nsec >= BILLION) {
          ts.tv_nsec -= BILLION;
          ts.tv_sec++;
      }

      if(rexmitSlice->rxmitId > 10)
          /* after tenth retransmission, wait minimum one second */
          ts.tv_sec++;

      if(pc_consumeAnyWithTimeout(sendst->rc.incoming, &ts) != 0) {
#if DEBUG
          flprintf("Have data\n");
#endif
          {
            struct timeval tv2;
            unsigned long timeout;
            gettimeofday(&tv2, 0);
            timeout = 
                (tv2.tv_sec - tv.tv_sec) * 1000000+
                tv2.tv_usec - tv.tv_usec;
            if(!firstWait)
                timeout += waitAverage;
            waitAverage += 9; /* compensate against rounding errors */
            waitAverage = (0.9 * waitAverage + 0.1 * timeout);
          }
          firstWait = 1;
          continue;
      }
      if(rexmitSlice == NULL) {
          udpc_flprintf("Weird. Timeout and no rxmit slice");
          break;
      }
      if(!firstWait){
#ifndef __CYGWIN__
          /* on Cygwin, we would get too many of those messages... */
          udpc_flprintf("Timeout notAnswered=");
          udpc_printNotSet(sendst->rc.participantsDb, 
                       rexmitSlice->answeredSet);
          udpc_flprintf(" notReady=");
          udpc_printNotSet(sendst->rc.participantsDb, rexmitSlice->sl_reqack.readySet);
          udpc_flprintf(" nrAns=%d nrRead=%d nrPart=%d avg=%ld\n",
                    rexmitSlice->nrAnswered,
                    rexmitSlice->nrReady,
                    udpc_nrParticipants(sendst->rc.participantsDb),
                    waitAverage);
#endif
      }
      firstWait=0;
      if(rexmitSlice->rxmitId > config->retriesUntilDrop) {
          int i;
            for(i=0; i < MAX_CLIENTS; i++) {
                if(udpc_isParticipantValid(sendst->rc.participantsDb, i) && 
               !BIT_ISSET(i, rexmitSlice->sl_reqack.readySet)) {
                    udpc_flprintf("Dropping client #%d because of timeout\n",
                          i);
                    udpc_removeParticipant(sendst->rc.participantsDb, i);
                    if(nrParticipants(sendst->rc.participantsDb) == 0)
                  exit(0);
                }
            }
          continue;
      }
      rexmitSlice->rxmitId++;
      sendReqack(rexmitSlice, sendst->config, sendst->fifo, sendst->stats,
               sendst->socket, sendst->endianness);
    } while(udpc_nrParticipants(sendst->rc.participantsDb)||
          (config->flags & FLAG_ASYNC));
    cancelReturnChannel(&sendst->rc);
    return 0;
}


#define ADR(x, bs) (fifo->dataBuffer + \
      (slice->base+(x)*bs) % fifo->dataBufSize)

#ifdef BB_FEATURE_UDPCAST_FEC
static void fec_encode_all_stripes(sender_state_t sendst,
                           struct slice *slice)
{
    int stripe;
    struct net_config *config = sendst->config;
    struct fifo *fifo = sendst->fifo;
    int bytes = slice->bytes;
    int stripes = config->fec_stripes;
    int redundancy = config->fec_redundancy;
    int nrBlocks = (bytes + config->blockSize - 1) / config->blockSize;
    int leftOver = bytes % config->blockSize;
    unsigned char *fec_data = slice->fec_data;

    unsigned char *fec_blocks[redundancy];
    unsigned char *data_blocks[128];

    if(leftOver) {
      char *lastBlock = ADR(nrBlocks - 1, config->blockSize);
      bzero(lastBlock+leftOver, config->blockSize-leftOver);
    }

    for(stripe=0; stripe<stripes; stripe++) {
      int i,j;
      for(i=0; i<redundancy; i++)
          fec_blocks[i] = fec_data+config->blockSize*(stripe+i*stripes);
      for(i=stripe, j=0; i< nrBlocks; i+=stripes, j++)
          data_blocks[j] = ADR(i, config->blockSize);
      fec_encode(config->blockSize, data_blocks, j, fec_blocks, redundancy);

    }
}


static void *fecMain(void *args0)
{
    sender_state_t sendst = (sender_state_t) args0;

    slice_t slice;
    int sliceNo = 0;

    while(1) {
      /* consume free slice */
      slice = makeSlice(sendst, sliceNo++);
      /* do the fec calculation here */
      fec_encode_all_stripes(sendst,slice);
      pc_produce(sendst->fec_data_pc, 1);
    }
    return NULL;
}
#endif

int spawnNetSender(struct fifo *fifo,
               int sock,
               int endianness,
               struct net_config *config,
               participantsDb_t db,
               sender_stats_t stats)
{
    int i;

    sender_state_t sendst = MALLOC(struct senderState);
    sendst->fifo = fifo;
    sendst->socket = sock;
    sendst->config = config;
    sendst->endianness = endianness;
    sendst->stats = stats;
#ifdef BB_FEATURE_UDPCAST_FEC
    sendst->fec_data =  xmalloc(NR_SLICES *
                        config->fec_stripes * 
                        config->fec_redundancy *
                        config->blockSize);
#endif
    sendst->rc.participantsDb = db;
    initReturnChannel(&sendst->rc, sendst->config, sendst->socket);

    sendst->free_slices_pc = pc_makeProduconsum(NR_SLICES, "free slices");
    pc_produce(sendst->free_slices_pc, NR_SLICES);
    for(i = 0; i <NR_SLICES; i++)
      sendst->slices[i].state = SLICE_FREE;

#ifdef BB_FEATURE_UDPCAST_FEC
    if(sendst->config->flags & FLAG_FEC) {
      /* Free memory queue is initially full */
      fec_init();
      sendst->fec_data_pc = pc_makeProduconsum(NR_SLICES, "fec data");

      sendst->fec_thread = 0;
      pthread_create(&sendst->fec_thread, NULL, fecMain, sendst);
    }
#endif

    fifo->thread = 0;
    pthread_create(&fifo->thread, NULL, netSenderMain, sendst);
    return 0;
}

Generated by  Doxygen 1.6.0   Back to index