23d22 < * Ira Burton : Support for SOCK_CLUSTER 262d260 < 819,836d816 < < < < < /* Ira Burton < * Marks the current SKB to be sent, and then transmits the queue of frames. < */ < static __inline__ void cluster_push(struct sock *sk, struct tcp_opt *tp, int flags, int mss_now, int nonagle) < { < if (tp->send_head) { < struct sk_buff *skb = sk->write_queue.prev; < if (!(flags&MSG_MORE) || forced_push(tp)) < tcp_mark_push(tp, skb); < cluster_push_pending_frames(sk, tp, mss_now, (flags&MSG_MORE) ? 2 : nonagle); < } < } < < 997,1014d976 < < < /* Ira Burton < * Copies data from user and nulls checksum, similiar to cluster_skb_add_data < * except the entire message wasn't copied. < */ < static __inline__ void cluster_copy_to_page(struct sock *sk, char *from, struct sk_buff *skb, < struct page *page, int off, int copy) < { < copy_from_user(page_address(page)+off, from, copy); < skb->csum = 0; < skb->len += copy; < skb->data_len += copy; < skb->truesize += copy; < sk->wmem_queued += copy; < sk->forward_alloc -= copy; < } < 1033,1047d994 < < < /* Ira Burton < * Copies data from user space and nulls the checksum. < */ < static __inline__ void cluster_skb_add_data(struct sk_buff *skb, char *from, int copy) < { < copy_from_user(skb_put(skb, copy), from, copy); < skb->csum = 0; < } < < < < < 1263,1457d1209 < < < /* Ira Burton < * This is called when the users sends a msg. Attempts to copy the entire < * msg to the skb and send it. If there is not enough space, copy as much as < * possible and repeat until message is sent. Very similiar to the TCP code, < * except calls cluster functions. < * If we don't have a connection, let the TCP code handle it. < */ < int cluster_sendmsg(struct sock *sk, struct msghdr *msg, int size) < { < struct iovec *iov; < struct tcp_opt *tp; < struct sk_buff *skb; < int iovlen, flags; < int mss_now; < int err, copied; < long timeo; < < tp = &(sk->tp_pinfo.af_tcp); < < lock_sock(sk); < < flags = msg->msg_flags; < timeo = sock_sndtimeo(sk, flags&MSG_DONTWAIT); < < /* Wait for a connection to finish. */ < if ((1 << sk->state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT)) < if((err = wait_for_tcp_connect(sk, flags, &timeo)) != 0) < goto out_err; < < clear_bit(SOCK_ASYNC_NOSPACE, &sk->socket->flags); < < mss_now = tcp_current_mss(sk); < < < /* Ok commence sending. */ < iovlen = msg->msg_iovlen; < iov = msg->msg_iov; < copied = 0; < < err = -EPIPE; < < if (sk->err || (sk->shutdown&SEND_SHUTDOWN)) < goto do_error; < < while (--iovlen >= 0) { < int seglen=iov->iov_len; < unsigned char * from=iov->iov_base; < < iov++; < < while (seglen > 0) { < int copy; < < skb = sk->write_queue.prev; < < if (tp->send_head == NULL || < (copy = mss_now - skb->len) <= 0) { < < new_segment: < /* Allocate new segment. If the interface is SG, < * allocate skb fitting to single page. < */ < if (!tcp_memory_free(sk)) < goto wait_for_sndbuf; < < skb = tcp_alloc_pskb(sk, select_size(sk, tp), 0, sk->allocation); < if (skb == NULL) < goto wait_for_memory; < < skb_entail(sk, tp, skb); < copy = mss_now; < } < < /* Try to append data to the end of skb. */ < if (copy > seglen) < copy = seglen; < < /* Where to copy to? */ < if (skb_tailroom(skb) > 0) { < /* We have some space in skb head. Superb! */ < if (copy > skb_tailroom(skb)) < copy = skb_tailroom(skb); < cluster_skb_add_data(skb, from, copy); < < } else { < int merge = 0; < int i = skb_shinfo(skb)->nr_frags; < struct page *page = TCP_PAGE(sk); < int off = TCP_OFF(sk); < < if (can_coalesce(skb, i, page, off) && off != PAGE_SIZE) { < /* We can extend the last page fragment. */ < merge = 1; < } else if (i == MAX_SKB_FRAGS || < (i == 0 && !(sk->route_caps&NETIF_F_SG))) { < /* Need to add new fragment and cannot < * do this because interface is non-SG, < * or because all the page slots are busy. < */ < tcp_mark_push(tp, skb); < goto new_segment; < } else if (page) { < /* If page is cached, align < * offset to L1 cache boundary < */ < off = (off+L1_CACHE_BYTES-1)&~(L1_CACHE_BYTES-1); < if (off == PAGE_SIZE) { < put_page(page); < TCP_PAGE(sk) = page = NULL; < } < } < < if (!page) { < /* Allocate new cache page. */ < if (!(page=tcp_alloc_page(sk))) < goto wait_for_memory; < off = 0; < } < if (copy > PAGE_SIZE-off) < copy = PAGE_SIZE-off; < < /* Time to copy data. We are close to the end! */ < cluster_copy_to_page(sk, from, skb, page, off, copy); < < /* Update the skb. */ < if (merge) { < skb_shinfo(skb)->frags[i-1].size += copy; < } else { < fill_page_desc(skb, i, page, off, copy); < if (TCP_PAGE(sk)) { < get_page(page); < } else if (off + copy < PAGE_SIZE) { < get_page(page); < TCP_PAGE(sk) = page; < } < } < < TCP_OFF(sk) = off+copy; < } < if (!copied) < TCP_SKB_CB(skb)->flags &= ~TCPCB_FLAG_PSH; < < tp->write_seq += copy; < TCP_SKB_CB(skb)->end_seq += copy; < < from += copy; < copied += copy; < seglen -= copy; < < if (skb->len != mss_now || (flags&MSG_OOB)) < continue; < if (forced_push(tp)) { < tcp_mark_push(tp, skb); < cluster_push_pending_frames(sk, tp, mss_now, 1); < } else if (skb == tp->send_head) < cluster_push_one(sk, mss_now); < continue; < < wait_for_sndbuf: < set_bit(SOCK_NOSPACE, &sk->socket->flags); < wait_for_memory: < if (copied) < cluster_push(sk, tp, flags&~MSG_MORE, mss_now, 1); < < if ((err = wait_for_tcp_memory(sk, &timeo)) != 0) < goto do_error; < < mss_now = tcp_current_mss(sk); < < } < } < < out: < if (copied) < cluster_push(sk, tp, flags, mss_now, tp->nonagle); < release_sock(sk); < return copied; < < do_error: < if (copied) < goto out; < out_err: < err = tcp_error(sk, flags, err); < release_sock(sk); < return err; < < } < < < < < < 1582,1645d1333 < < < < /* Ira Burton < * Clean up the receive buffer for full frames taken by the user, < * then send an ACK if necessary. COPIED is the number of bytes < * tcp_recvmsg has given to the user so far, it speeds up the < * calculation of whether or not we must ACK for the sake of < * a window update. Identical to TCP code except it calls < * cluster functions. < */ < static __inline__ void cluster_cleanup_rbuf(struct sock *sk, int copied) < { < struct tcp_opt *tp = &(sk->tp_pinfo.af_tcp); < int time_to_ack = 0; < < if (tcp_ack_scheduled(tp)) { < /* Delayed ACKs frequently hit locked sockets during bulk receive. */ < if (tp->ack.blocked < /* Once-per-two-segments ACK was not sent by tcp_input.c */ < || tp->rcv_nxt - tp->rcv_wup > tp->ack.rcv_mss < /* < * If this read emptied read buffer, we send ACK, if < * connection is not bidirectional, user drained < * receive buffer and there was a small segment < * in queue. < */ < || (copied > 0 && < (tp->ack.pending&TCP_ACK_PUSHED) && < !tp->ack.pingpong && < atomic_read(&sk->rmem_alloc) == 0)) { < time_to_ack = 1; < } < } < < /* We send an ACK if we can now advertise a non-zero window < * which has been raised "significantly". < * < * Even if window raised up to infinity, do not send window open ACK < * in states, where we will not receive more. It is useless. < */ < if(copied > 0 && !time_to_ack && !(sk->shutdown&RCV_SHUTDOWN)) { < __u32 rcv_window_now = tcp_receive_window(tp); < < /* Optimize, __tcp_select_window() is not cheap. */ < if (2*rcv_window_now <= tp->window_clamp) { < __u32 new_window = __tcp_select_window(sk); < < /* Send ACK now, if this read freed lots of space < * in our buffer. Certainly, new_window is new window. < * We can advertise it now, if it is not less than current one. < * "Lots" means "at least twice" here. < */ < if(new_window && new_window >= 2*rcv_window_now) < time_to_ack = 1; < } < } < if (time_to_ack) < cluster_send_ack(sk); < } < < < < 1998,2254d1685 < < < /* Ira Burton < * Called when user space wants to receive a msg. Read from queue until enough < * data was read. If not enough data was in queue, wait for more from the device. < * Very similiar to TCP code except makes cluster calls. < */ < int cluster_recvmsg(struct sock *sk, struct msghdr *msg, < int len, int nonblock, int flags, int *addr_len) < { < struct tcp_opt *tp = &(sk->tp_pinfo.af_tcp); < int copied = 0; < u32 peek_seq; < u32 *seq; < unsigned long used; < int err; < int target; /* Read at least this many bytes */ < long timeo; < struct task_struct *user_recv = NULL; < < lock_sock(sk); < < err = -ENOTCONN; < if (sk->state == TCP_LISTEN) < goto out; < < timeo = sock_rcvtimeo(sk, nonblock); < < seq = &tp->copied_seq; < if (flags & MSG_PEEK) { < peek_seq = tp->copied_seq; < seq = &peek_seq; < } < < target = sock_rcvlowat(sk, flags & MSG_WAITALL, len); < do { < struct sk_buff * skb; < u32 offset; < < /* We need to check signals first, to get correct SIGURG < * handling. FIXME: Need to check this doesnt impact 1003.1g < * and move it down to the bottom of the loop < */ < if (signal_pending(current)) { < if (copied) < break; < copied = timeo ? sock_intr_errno(timeo) : -EAGAIN; < break; < } < < /* Next get a buffer. */ < < skb = skb_peek(&sk->receive_queue); < do { < if (!skb) < break; < < offset = *seq - TCP_SKB_CB(skb)->seq; < if (skb->h.th->syn) < offset--; < if (offset < skb->len) < goto found_ok_skb; < if (skb->h.th->fin) < goto found_fin_ok; < BUG_TRAP(flags&MSG_PEEK); < skb = skb->next; < } while (skb != (struct sk_buff *)&sk->receive_queue); < < /* Well, if we have backlog, try to process it now yet. */ < if (copied >= target && sk->backlog.tail == NULL) < break; < < if (copied) { < if (sk->err || < sk->state == TCP_CLOSE || < (sk->shutdown & RCV_SHUTDOWN) || < !timeo || < (flags & MSG_PEEK)) < break; < } else { < if (sk->done) < break; < < if (sk->err) { < copied = sock_error(sk); < break; < } < < if (sk->shutdown & RCV_SHUTDOWN) < break; < < if (sk->state == TCP_CLOSE) { < if (!sk->done) { < /* This occurs when user tries to read < * from never connected socket. < */ < copied = -ENOTCONN; < break; < } < break; < } < < if (!timeo) { < copied = -EAGAIN; < break; < } < } < cluster_cleanup_rbuf(sk, copied); < < if (tp->ucopy.task == user_recv) { < /* Install new reader */ < if (user_recv == NULL && !(flags&(MSG_TRUNC|MSG_PEEK))) { < user_recv = current; < tp->ucopy.task = user_recv; < tp->ucopy.iov = msg->msg_iov; < } < < tp->ucopy.len = len; < < BUG_TRAP(tp->copied_seq == tp->rcv_nxt || (flags&(MSG_PEEK|MSG_TRUNC))); < < /* Ugly... If prequeue is not empty, we have to < * process it before releasing socket, otherwise < * order will be broken at second iteration. < * More elegant solution is required!!! < * < * Look: we have the following (pseudo)queues: < * < * 1. packets in flight < * 2. backlog < * 3. prequeue < * 4. receive_queue < * < * Each queue can be processed only if the next ones < * are empty. At this point we have empty receive_queue. < * But prequeue _can_ be not empty after second iteration, < * when we jumped to start of loop because backlog < * processing added something to receive_queue. < * We cannot release_sock(), because backlog contains < * packets arrived _after_ prequeued ones. < * < * Shortly, algorithm is clear --- to process all < * the queues in order. We could make it more directly, < * requeueing packets from backlog to prequeue, if < * is not empty. It is more elegant, but eats cycles, < * unfortunately. < */ < if (skb_queue_len(&tp->ucopy.prequeue)) < goto do_prequeue; < } < < if (copied >= target) { < /* Do not sleep, just process backlog. */ < release_sock(sk); < lock_sock(sk); < } else { < timeo = tcp_data_wait(sk, timeo); < } < < if (user_recv) { < int chunk; < < /* __ Restore normal policy in scheduler __ */ < if ((chunk = len - tp->ucopy.len) != 0) { < net_statistics[smp_processor_id()*2+1].TCPDirectCopyFromBacklog += chunk; < len -= chunk; < copied += chunk; < } < < if (tp->rcv_nxt == tp->copied_seq && < skb_queue_len(&tp->ucopy.prequeue)) { < do_prequeue: < tcp_prequeue_process(sk); < < if ((chunk = len - tp->ucopy.len) != 0) { < net_statistics[smp_processor_id()*2+1].TCPDirectCopyFromPrequeue += chunk; < len -= chunk; < copied += chunk; < } < } < } < continue; < found_ok_skb: < /* Ok so how much can we use? */ < used = skb->len - offset; < if (len < used) < used = len; < < if (!(flags&MSG_TRUNC)) { < err = skb_copy_datagram_iovec(skb, offset, msg->msg_iov, used); < if (err) { < /* Exception. Bailout! */ < if (!copied) < copied = -EFAULT; < break; < } < } < < *seq += used; < copied += used; < len -= used; < < //skip_copy: < if (tp->urg_data && after(tp->copied_seq,tp->urg_seq)) { < tp->urg_data = 0; < tcp_fast_path_check(sk, tp); < } < if (used + offset < skb->len) < continue; < < if (skb->h.th->fin) < goto found_fin_ok; < if (!(flags & MSG_PEEK)) < tcp_eat_skb(sk, skb); < continue; < < found_fin_ok: < /* Process the FIN. */ < ++*seq; < if (!(flags & MSG_PEEK)) < tcp_eat_skb(sk, skb); < break; < } while (len > 0); < < if (user_recv) { < if (skb_queue_len(&tp->ucopy.prequeue)) { < int chunk; < < tp->ucopy.len = copied > 0 ? len : 0; < < tcp_prequeue_process(sk); < < if (copied > 0 && (chunk = len - tp->ucopy.len) != 0) { < net_statistics[smp_processor_id()*2+1].TCPDirectCopyFromPrequeue += chunk; < len -= chunk; < copied += chunk; < } < } < < tp->ucopy.task = NULL; < tp->ucopy.len = 0; < } < /* According to UNIX98, msg_name/msg_namelen are ignored < * on connected socket. I was just happy when found this 8) --ANK < */ < < /* Clean up data we have read: This will do ACK frames. */ < cluster_cleanup_rbuf(sk, copied); < release_sock(sk); < return copied; < < out: < release_sock(sk); < return err; < } < < 3129,3132d2559 < < < <