并发,阻塞与非阻塞的有关问题
并发,阻塞与非阻塞的问题
帮忙看下
请问这是并发的么?阻塞与非阻塞应该怎么处理。
我需要一个并发非阻塞式的,请问如何实现
谢谢
帮忙看下
请问这是并发的么?阻塞与非阻塞应该怎么处理。
我需要一个并发非阻塞式的,请问如何实现
谢谢
- C/C++ code
//NTP_send.c #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/wait.h> #include <sys/socket.h> #include <sys/types.h> #include <sys/time.h> #include <netinet/in.h> #include <netdb.h> #include <unistd.h> #include <arpa/inet.h> #include <sys/ioctl.h> #include<pthread.h> #define int8 char #define uint8 unsigned char #define uint32 unsigned int #define ulong32 unsigned long #define long32 long #define int32 int #define long64 long long //3600s*24h*(365days*70years+17days) #define From00to70 0x83aa7e80U #define NTPSVR "192.168.2.8" //MY server #define LOCAL "192.168.2.109" #define LOCALPORT 8000 #define NTPPORT 123 typedef struct NTPPACKET { uint8 li_vn_mode; uint8 stratum; uint8 poll; uint8 precision; //有符号整数表示本地时钟精确度 ulong32 root_delay; //到达服务器的一次往返的总延时,是15到16位有符号的定点小数 ulong32 root_dispersion; // 到达服务器的一次标准误差,是15-16位的无符号的定点小数 int8 ref_id[4]; ulong32 reftimestamphigh; //本地时钟最后被设定或校正的时间T4 ulong32 reftimestamplow; ulong32 oritimestamphigh; //向服务器请求分离客户机的时间戳,采用64位时标格式T1 ulong32 oritimestamplow; ulong32 recvtimestamphigh; //向服务器请求到客户机的时间戳,采用64位时标格式T2 ulong32 recvtimestamplow; ulong32 trantimestamphigh; //向客户机答复分离服务器的时间戳,采用64位时标格式T3,用T3来校正本地时间 ulong32 trantimestamplow; }NTPPacket; NTPPacket ntppack,newpack; //定义为long64,解决32位数的符号位问题 long64 firsttimestamp,finaltimestamp; long64 diftime,delaytime; void NTP_Init() { bzero(&ntppack,sizeof(ntppack)); ntppack.li_vn_mode=0x1b;//0|(3<<2)|(3<<5); //获取初始时间戳T1 firsttimestamp="From00to70"+time(NULL);//-8*3600; ntppack.oritimestamphigh=htonl(firsttimestamp); } fd_set inset1; int32 sockfd; struct timeval tv,tv1; struct timezone tz; struct sockaddr_in addr,local_addr; pthread_t tidA,tidB; #define capacity 2 //定义线程的数量 #define MAXORDER 10 #define timer 5000 //每970 us 发个包,计划是1000 us 发一次,考虑到了网络延时30 us int counter = 0; //所以完全发完应该是1 s int tmp = 0; int fd[2]; #define maxpkt 100 //一次发包量 int tid_capacity = 0; //第几个线程 char order[MAXORDER]; int rec_maxpkt; //接收总量 int rec_pkt = 0; //接收计数 rec_maxpkt=maxpkt*capacity; //计划接收量 pid_t pid; pthread_mutex_t ntppack_mutex = PTHREAD_MUTEX_INITIALIZER;//init pthread pthread_mutex_t newpack_mutex = PTHREAD_MUTEX_INITIALIZER;//init pthread //用于接收服务器返回的请求,如果接收数与发送的包相同则 //打印结果,否则接收等待 void *recv_pkt(void *); void *recv_pkt(void *vptr) { int num=0; float rec_ppkt=0; while(recv(sockfd,&newpack,sizeof(newpack),0) >0 ) { //printf("In revc ok\n"); pthread_mutex_trylock(&newpack_mutex); rec_pkt++; printf("\n Packets have been receive %d \n",rec_pkt); pthread_mutex_unlock(&newpack_mutex); //printf("rec_maxpkt =%d\n",rec_maxpkt); //rec_ppkt=(float)(rec_pkt)/(float)(rec_maxpkt); //printf("receive percent = %f %\n",rec_ppkt*100); //printf("lost packets = %d \n",rec_maxpkt-rec_pkt); //printf("lost percent = %f % \n",(1-rec_ppkt)*100); if(rec_maxpkt==rec_pkt) { // maxpkt=maxpkt*2; printf ("total have been receive %d packets\n\n",rec_pkt); exit(1); // close(fd[0]); //write(fd[1], NULL, 3); } } } void *send_pkt(void *); void *send_pkt (void *vptr) { //every pthread need send X packets int num=0; for(num;num<maxpkt;num++) { pthread_mutex_trylock(&ntppack_mutex); //加锁 counter=tmp+1; //发送数据请求包 sendto(sockfd,&ntppack,sizeof(ntppack),0, (struct sockaddr *)&addr,sizeof(struct sockaddr)); tmp=counter; printf("have been send %d packets\n",counter); // pthread_create(&tidB,NULL,recv_pkt,NULL); //创建线程 // pthread_join (tidB,NULL); pthread_mutex_unlock(&ntppack_mutex); //解锁 usleep(timer); //计算每X us发一个包 /* if(counter==1000) { usleep(); printf("send ok \n"); printf("have been send %d packets\n",counter); // usleep(2000000); exit(1); } */ } return NULL; } int main(int argc, char *argv[] ) { addr.sin_family=AF_INET; //IPV4协议 addr.sin_port =htons(NTPPORT); //NTP专用的123端口 addr.sin_addr.s_addr=inet_addr(NTPSVR); //校时服务器 bzero(&(addr.sin_zero),8); //清零 local_addr.sin_family=AF_INET; local_addr.sin_port=htons(LOCALPORT); local_addr.sin_addr.s_addr=inet_addr(LOCAL); bzero(&(local_addr.sin_zero),8); if((sockfd=socket(AF_INET,SOCK_DGRAM,0))<0) { perror("create socket error!\n"); exit(1); } //bind a port 8000 //允许重复绑定 int tmp = 1; int bindport=0; setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR, & tmp, sizeof(tmp)); bindport=bind(sockfd,(struct sockaddr *)&local_addr,sizeof(struct sockaddr)); //struct sockaddr if(bindport==-1) { perror("bind error"); exit(1); } NTP_Init();; if(pipe(fd)<0) { perror("pipe error!\n"); exit(1); } pid=fork(); if(pid==0) //f程负责发送 { // usleep(100); //并发线程 for (tid_capacity;tid_capacity<capacity;tid_capacity++) { /* * create 10 pthreads */ printf("In main send!\n"); pthread_create (&tidA,NULL,send_pkt,NULL); //创建线程 pthread_join (tidA,NULL); //回收线程 // close(fd[1]); // if(read(fd[0],order,MAXORDER)>0) // { // goto loop; // } } } if(pid>0) //z程负责接收 { printf("In main recv!\n"); pthread_create(&tidB,NULL,recv_pkt,NULL); //创建线程 pthread_join (tidB,NULL); //回收线程 } close(sockfd); // } }