tcp posix api
send发送
ssize_t nsend(int sockfd, const void *buf, size_t len, __attribute__((unused))int flags) {
ssize_t length = 0;
void* hostinfo = get_host_fromfd(sockfd);
if (hostinfo == NULL) {
return -1;
}
struct ln_tcp_stream* stream = (struct ln_tcp_stream*)hostinfo;
if (stream->proto == IPPROTO_TCP) {
struct ln_tcp_fragment* fragment = rte_malloc("send frag", sizeof(struct ln_tcp_fragment), 0);
if (fragment == NULL) {
return 0;
}
memset(fragment, 0, sizeof(struct ln_tcp_fragment));
fragment->sport = stream->dport;
fragment->dport = stream->sport;
fragment->acknum = stream->recv_next;
fragment->seqnum = stream->send_next;
fragment->windows = LN_TCP_INITIAL_WINDOWS;
fragment->tcp_flags = RTE_TCP_ACK_FLAG | RTE_TCP_PSH_FLAG;
fragment->hdr_off = 0x50;
fragment->data = rte_malloc("frag data", len + 1, 0);
if (fragment->data == NULL) {
rte_free(fragment);
return -1;
}
memset(fragment->data, 0, len + 1);
rte_memcpy(fragment->data, buf, len);
fragment->length = len;
length = fragment->length;
rte_ring_mp_enqueue(stream->send_next, (void*)fragment);
}
return length;
}
recv接收
ssize_t nrecv(int sockfd, void *buf, size_t len, __attribute__((unused))int flags) {
ssize_t length = 0;
void* hostinfo = get_host_fromfd(sockfd);
if (hostinfo == NULL) {
return -1;
}
struct ln_tcp_stream* stream = (struct ln_tcp_stream*)hostinfo;
if (stream->proto == IPPROTO_TCP) {
struct ln_tcp_fragment* fragment = NULL;
int nb_rev = 0;
pthread_mutex_lock(&stream->mutex);
while((nb_rev = rte_ring_mc_dequeue(stream->recvbuf, (void**)&fragment)) < 0) {
pthread_cond_wait(&stream->cond, &stream->mutex);
}
pthread_mutex_unlock(&stream->mutex);
if (fragment->length > len) {
rte_memcpy(buf, fragment->data, len);
int i;
for (i = 0; i < fragment->length - len; i++) {
fragment->data[i] = fragment->data[i + len];
}
fragment->length = fragment->length - len;
length = fragment->length;
rte_ring_mp_enqueue(stream->recvbuf, (void*)fragment);
}
else if (fragment->length == 0) {
rte_free(fragment);
}
else {
rte_memcpy(buf, fragment->data, len);
length = fragment->length;
rte_free(fragment->data);
fragment->data = NULL;
rte_free(fragment);
}
}
return length;
}
数据包收发管理管理
接收
static int ln_tcp_enqueue_recvbuf(struct ln_tcp_stream* stream, struct rte_tcp_hdr* tcphdr, int tcplen) {
struct ln_tcp_fragment* fragment = rte_malloc("tcp frag", sizeof(struct ln_tcp_fragment));
if (fragment == NULL) {
return -1;
}
memset(fragment, 0, sizeof(struct ln_tcp_fragment));
fragment->dport = ntohs(tcphdr->dst_port);
fragment->sport = ntohs(tcphdr->src_port);
uint8_t hdrlen = tcphdr->data_off >> 4;
int payloadlen = tcplen - hdrlen * 4;
if (payloadlen > 0) {
uint8_t* payload = (uint8_t*)tcphdr + hdrlen * 4;
fragment->data = rte_malloc("frag data", payloadlen + 1, 0);
if (fragment->data == NULL) {
rte_free(fragment);
return -1;
}
memset(fragment, 0, payloadlen + 1);
rte_memcpy(fragment->data, payload, payloadlen);
fragment->length = payloadlen;
}
else if (payloadlen == 0) {
fragment->length = 0;
fragment->data = NULL;
}
rte_ring_mp_enqueue(stream->recvbuf, (void*)fragment);
pthread_mutex_lock(&stream->mutex);
pthread_cond_signal(&stream->cond);
pthread_mutex_unlock(&stream->mutex);
return 0;
}
发送
static int ln_tcp_send_ackpkt(struct ln_tcp_stream* stream, struct rte_tcp_hdr* tcphdr) {
struct ln_tcp_fragment* fragment = rte_malloc("ack frag", sizeof(struct ln_tcp_fragment), 0);
if (fragment == NULL) {
return -1;
}
memset(fragment, 0, sizeof(struct ln_tcp_fragment));
fragment->sport = stream->dport;
fragment->dport = stream->sport;
fragment->acknum = stream->recv_next;
fragment->seqnum = stream->send_next;
fragment->windows = LN_TCP_INITIAL_WINDOWS;
fragment->length = 0;
fragment->data = NULL;
fragment->hdr_off = 0x50;
fragment->tcp_flags = RTE_TCP_ACK_FLAG;
rte_ring_mp_enqueue(stream->sendbuf, (void*)fragment);
return 0;
}
establish handle
static int ln_tcp_handle_close_established(struct ln_tcp_stream* stream, struct rte_tcp_hdr* tcphdr, int tcplen) {
if (tcphdr->tcp_flags & RTE_TCP_SYN_FLAG) {
}
if (tcphdr->tcp_flags & RTE_TCP_PSH_FLAG) {
ln_tcp_enqueue_recvbuf(stream, tcphdr, tcplen);
uint8_t hdrlen = tcphdr->data_off >> 4;
int payloadlen = tcphdr - hdrlen * 4;
stream->recv_next = stream->recv_next + payloadlen;
stream->send_next = ntohl(tcphdr->recv_ack);
ln_tcp_send_ackpkt(stream, tcphdr);
}
if (tcphdr->tcp_flags & RTE_TCP_ACK_FLAG) {
}
if (tcphdr->tcp_flags & RTE_TCP_FIN_FLAG) {
stream->status = LN_TCP_STATUS_CLOSE_WAIT;
ln_tcp_enqueue_recvbuf(stream, tcphdr, tcphdr->data_off >> 4);
stream->recv_next = stream->recv_next + 1;
stream->send_next = ntohl(tcphdr->recv_ack);
ln_tcp_send_ackpkt(stream, tcphdr);
}
return 0;
}
tcp server
static int tcp_server_entry(__attribute__((unused)) void* arg) {
int sockfd = nsocket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
return -1;
}
printf("tcp sockfd is %d\n", sockfd);
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(struct sockaddr));
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(8888);
nbind(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr));
nlisten(sockfd, 10);
while (1) {
struct sockaddr_in client;
socklen_t len = sizeof(client);
int connfd = naccept(sockfd, (struct sockaddr*)&client, &len);
char buff[BUFFER_SIZE] = {0};
while (1) {
int n = nrecv(connfd, buff, BUFFER_SIZE, 0); // block
if (n > 0) {
printf("recv: %s\n", buff);
nsend(connfd, buff, n, 0);
}
else if (n == 0) {
nclose(connfd);
break;
}
else {
//nonblock
}
}
}
nclose(sockfd);
}
效果展示
回去之后调好了,有点难调
总结
到目前为止,IP/TCP和IP/UDP的协议栈都写完了,但是没有并发效果;这个后面会解决。下一步是探索一下协议的扩展,写一个dns服务器来看一下如何基于tcp或者udp来扩展协议。
项目地址
项目地址
参考资料:https://github.com/0voice