并发,阻塞与非阻塞的有关问题

并发,阻塞与非阻塞的问题
帮忙看下

请问这是并发的么?阻塞与非阻塞应该怎么处理。

我需要一个并发非阻塞式的,请问如何实现

谢谢
C/C++ code
//NTP_send.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/wait.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <netdb.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/ioctl.h>
#include<pthread.h>

#define  int8      char
#define  uint8     unsigned char
#define  uint32    unsigned int
#define  ulong32   unsigned long
#define  long32    long
#define  int32     int
#define  long64    long long

//3600s*24h*(365days*70years+17days)
#define  From00to70 0x83aa7e80U

#define  NTPSVR            "192.168.2.8"        //MY server
#define  LOCAL              "192.168.2.109"
#define  LOCALPORT       8000
#define  NTPPORT           123

typedef struct NTPPACKET
{
  uint8     li_vn_mode;
  uint8     stratum;
  uint8     poll;
  uint8     precision;                                         //有符号整数表示本地时钟精确度
  ulong32   root_delay;                                   //到达服务器的一次往返的总延时,是15到16位有符号的定点小数
  ulong32   root_dispersion;                           // 到达服务器的一次标准误差,是15-16位的无符号的定点小数
  int8      ref_id[4];
  ulong32   reftimestamphigh;                      //本地时钟最后被设定或校正的时间T4
  ulong32   reftimestamplow;
  ulong32   oritimestamphigh;                //向服务器请求分离客户机的时间戳,采用64位时标格式T1
  ulong32   oritimestamplow;
  ulong32   recvtimestamphigh;             //向服务器请求到客户机的时间戳,采用64位时标格式T2
  ulong32   recvtimestamplow;
  ulong32   trantimestamphigh;           //向客户机答复分离服务器的时间戳,采用64位时标格式T3,用T3来校正本地时间 
  ulong32   trantimestamplow;
}NTPPacket;

NTPPacket  ntppack,newpack;

//定义为long64,解决32位数的符号位问题
long64   firsttimestamp,finaltimestamp;
long64   diftime,delaytime;

void NTP_Init()
{
  bzero(&ntppack,sizeof(ntppack));
  ntppack.li_vn_mode=0x1b;//0|(3<<2)|(3<<5);
  //获取初始时间戳T1
  firsttimestamp="From00to70"+time(NULL);//-8*3600;
  ntppack.oritimestamphigh=htonl(firsttimestamp);
}

  fd_set  inset1;
  int32  sockfd;
  struct timeval tv,tv1;
  struct timezone tz;
  struct sockaddr_in addr,local_addr;   
  pthread_t tidA,tidB;
  
#define  capacity  2  //定义线程的数量
#define MAXORDER 10
 
#define           timer                       5000                   //每970 us 发个包,计划是1000 us 发一次,考虑到了网络延时30 us
int                 counter                 =      0;                  //所以完全发完应该是1 s
int                 tmp                       =      0;

int                 fd[2];
#define          maxpkt                       100                  //一次发包量
int                 tid_capacity           =     0;                   //第几个线程
char               order[MAXORDER];

int                 rec_maxpkt;                          //接收总量
int                 rec_pkt                 =     0;           //接收计数
 rec_maxpkt=maxpkt*capacity;  //计划接收量    
 
pid_t pid;
pthread_mutex_t ntppack_mutex = PTHREAD_MUTEX_INITIALIZER;//init pthread
pthread_mutex_t newpack_mutex = PTHREAD_MUTEX_INITIALIZER;//init pthread

//用于接收服务器返回的请求,如果接收数与发送的包相同则
//打印结果,否则接收等待
void *recv_pkt(void *);

void *recv_pkt(void *vptr)
{
  int num=0;
  float  rec_ppkt=0;

  while(recv(sockfd,&newpack,sizeof(newpack),0) >0 )
     { 
   
              
       //printf("In revc ok\n");
       pthread_mutex_trylock(&newpack_mutex);
    
    rec_pkt++;

      printf("\n Packets have been  receive %d \n",rec_pkt);
    pthread_mutex_unlock(&newpack_mutex);

    //printf("rec_maxpkt =%d\n",rec_maxpkt);
    //rec_ppkt=(float)(rec_pkt)/(float)(rec_maxpkt);
    //printf("receive percent = %f   %\n",rec_ppkt*100);
    //printf("lost packets = %d \n",rec_maxpkt-rec_pkt);
    //printf("lost percent = %f  % \n",(1-rec_ppkt)*100);
    if(rec_maxpkt==rec_pkt)
          {
         //   maxpkt=maxpkt*2;
             printf ("total have been receive  %d  packets\n\n",rec_pkt);
                exit(1);
            //  close(fd[0]);
        //write(fd[1], NULL, 3);    
   }
}
}

void *send_pkt(void *);

void  *send_pkt  (void *vptr)
   { 
    //every pthread need send X packets
         int  num=0;

    for(num;num<maxpkt;num++)
        {
        pthread_mutex_trylock(&ntppack_mutex);                     //加锁   

         counter=tmp+1;
                 //发送数据请求包
                 sendto(sockfd,&ntppack,sizeof(ntppack),0,
                                  (struct sockaddr *)&addr,sizeof(struct sockaddr));
           
           tmp=counter;
          
                 printf("have been send  %d  packets\n",counter);
            //       pthread_create(&tidB,NULL,recv_pkt,NULL);            //创建线程      
                      //            pthread_join (tidB,NULL);    
          pthread_mutex_unlock(&ntppack_mutex);                  //解锁
                usleep(timer);     //计算每X us发一个包
/*              if(counter==1000)
                    {
                   usleep();
                    printf("send ok \n");
                    printf("have been send  %d  packets\n",counter);
               //    usleep(2000000);
                    exit(1);
                   }
 */               
        }
    return NULL;
   }  




int main(int argc, char *argv[] )
{
 
  
   addr.sin_family=AF_INET;   //IPV4协议
  addr.sin_port =htons(NTPPORT);   //NTP专用的123端口
   addr.sin_addr.s_addr=inet_addr(NTPSVR);   //校时服务器
   bzero(&(addr.sin_zero),8);   //清零 
 
   local_addr.sin_family=AF_INET;
   local_addr.sin_port=htons(LOCALPORT);
   local_addr.sin_addr.s_addr=inet_addr(LOCAL);
   bzero(&(local_addr.sin_zero),8); 


  if((sockfd=socket(AF_INET,SOCK_DGRAM,0))<0)
    {
      perror("create socket error!\n");
      exit(1);
    }

//bind a port 8000 
     //允许重复绑定

        int tmp = 1;
        int  bindport=0;
     setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR, & tmp, sizeof(tmp));
     bindport=bind(sockfd,(struct sockaddr *)&local_addr,sizeof(struct sockaddr));  //struct sockaddr
    if(bindport==-1)
      {
         perror("bind error");
           exit(1);
       }

   NTP_Init();;
    if(pipe(fd)<0)
       {
        perror("pipe error!\n");
    exit(1);
      }

     pid=fork();

    if(pid==0)                                          //f程负责发送

    {                          
                   // usleep(100);               
                        //并发线程
                   for (tid_capacity;tid_capacity<capacity;tid_capacity++)
                           {
                          /*
                       *  create 10 pthreads
                      */
                           printf("In main send!\n");                                 
                        pthread_create (&tidA,NULL,send_pkt,NULL);            //创建线程      
              pthread_join (tidA,NULL);                                    //回收线程    
                       //             close(fd[1]);
                     //        if(read(fd[0],order,MAXORDER)>0)
                      //           {
                           //       goto loop;
                       //          }
                }
                      
       }

   if(pid>0)                          //z程负责接收
     {
               printf("In main recv!\n");
                 pthread_create(&tidB,NULL,recv_pkt,NULL);            //创建线程                      
          pthread_join (tidB,NULL);                                    //回收线程          
         }

   close(sockfd);
//      }
}