Linux网络编程 Part4 高并发服务器之二

线程池并发服务器

为什么要引入线程池,不直接用之前的多线程并发?

因为多线程并发要频繁地对线程进行创建与销毁(一旦有客户端传来需求,就创建一个新线程,通信结束就立刻销毁该线程),这会占用大量的CPU时间并导致不必要的系统开销

相关结构体

线程池结构体:描述线程池相关信息

pthread_mutex_t lock;               /* 用于锁住本结构体 */    
pthread_mutex_t thread_counter;     /* 记录忙状态线程个数的锁 -- busy_thr_num */

pthread_cond_t queue_not_full;      /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */
pthread_cond_t queue_not_empty;     /* 任务队列里不为空时,通知等待任务的线程 */

pthread_t *threads;                 /* 数组,存放线程池中每个线程的tid */
pthread_t adjust_tid;               /* 存管理线程的tid */
threadpool_task_t *task_queue;      /* 任务队列(自定义任务队列结构体数组首地址) */

int min_thr_num;                    /* 线程池最小线程数 */
int max_thr_num;                    /* 线程池最大线程数 */
int live_thr_num;                   /* 当前存活线程个数 */
int busy_thr_num;                   /* 忙状态线程个数 */
int wait_exit_thr_num;              /* 要销毁的线程个数 */

int queue_front;                    /* task_queue队头下标 */
int queue_rear;                     /* task_queue队尾下标 */
int queue_size;                     /* task_queue队中实际任务数 */
int queue_max_size;                 /* task_queue队列可容纳任务数上限 */

int shutdown;                       /* 标志位,线程池使用状态,是否关闭线程池,true或false */

任务队列结构体:在线程池结构体中充当任务队列

typedef struct {
    void *(*function)(void *);          /* 函数指针,回调函数 */
    void *arg;                          /* 上面函数的参数 */
} threadpool_task_t;                    /* 各子线程任务结构体 */

线程池模块分析

  1. main()
    • 创建线程池
    • 向线程池中添加任务(即给线程池中的线程分配任务),借助回调函数处理任务
    • 销毁线程池。
  2. threadpool_create()
    • 创建线程池结构体指针
    • 初始化线程池结构体
    • 创建n个任务线程
    • 创建1个管理者线程
    • 失败时,销毁开辟的所有空间(释放)
  3. threadpool_thread() part1
    • 进入子线程回调函数
    • 接收参数void *arg,即pool结构体
    • 对lock进行加锁(即锁住整个结构体的那个锁)
    • 判断条件变量→wait
  4. adjust_thread()
    • 进入管理者线程回调函数
    • 循环10s执行一次(管理者不用一直盯着)
    • 接收参数void *arg,即pool结构体
    • 对lock进行加锁
    • 获取管理线程池要用到的变量:task_num、live_num、busy_num
    • 根据既定算法,使用上述三个变量,判断是否应该创建、销毁线程池中指定步长数的线程
  5. threadpool_add():
    • 对lock进行加锁
    • 初始化任务队列结构体成员:设置回调函数function与函数参数arg
    • 利用环形队列机制(移动队尾指针和取余操作),实现添加任务,即入队
    • 唤醒阻塞在条件变量queue_not_empty上的线程
    • 解锁
  6. threadpool_thread() part2
    • 从3.中的wait之后继续执行
    • 获取任务处理回调函数及其参数
    • 利用环形队列机制(移动队头指针和取余操作),实现处理任务,即出队
    • 唤醒阻塞在条件变量queue_not_full上的server
    • 对lock进行解锁
    • 对thread_counter加锁
    • 修改忙线程数++
    • 对thread_counter解锁
    • 执行处理任务的线程
    • 对thread_counter加锁
    • 修改忙线程数–
    • 对thread_counter解锁
  7. 扩容/瘦身
    • 当满足扩容条件时:
      • 执行pthread_create()
      • 回调任务线程函数
      • live_num++
    • 当满足瘦身条件时:
      • 设置wait_exit_thr_num = 10;
      • 给阻塞在条件变量queue_not_empty上的线程发送假条件满足信号
      • 阻塞线程会被假信号唤醒,执行销毁流程:判断wait_exit_thr_num > 0→pthread_exit();

实现代码

//能看懂即可
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <signal.h>
#include <errno.h>
#include "threadpool.h"

#define DEFAULT_TIME 10                 /*10s检测一次*/
#define MIN_WAIT_TASK_NUM 10            /*如果queue_size > MIN_WAIT_TASK_NUM 添加新的线程到线程池*/ 
#define DEFAULT_THREAD_VARY 10          /*每次创建和销毁线程的个数*/
#define true 1
#define false 0

typedef struct {
    void *(*function)(void *);          /* 函数指针,回调函数 */
    void *arg;                          /* 上面函数的参数 */
} threadpool_task_t;                    /* 各子线程任务结构体 */

/* 描述线程池相关信息 */

struct threadpool_t {
    pthread_mutex_t lock;               /* 用于锁住本结构体 */    
    pthread_mutex_t thread_counter;     /* 记录忙状态线程个数de琐 -- busy_thr_num */

    pthread_cond_t queue_not_full;      /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */
    pthread_cond_t queue_not_empty;     /* 任务队列里不为空时,通知等待任务的线程 */

    pthread_t *threads;                 /* 存放线程池中每个线程的tid。数组 */
    pthread_t adjust_tid;               /* 存管理线程tid */
    threadpool_task_t *task_queue;      /* 任务队列(数组首地址) */

    int min_thr_num;                    /* 线程池最小线程数 */
    int max_thr_num;                    /* 线程池最大线程数 */
    int live_thr_num;                   /* 当前存活线程个数 */
    int busy_thr_num;                   /* 忙状态线程个数 */
    int wait_exit_thr_num;              /* 要销毁的线程个数 */

    int queue_front;                    /* task_queue队头下标 */
    int queue_rear;                     /* task_queue队尾下标 */
    int queue_size;                     /* task_queue队中实际任务数 */
    int queue_max_size;                 /* task_queue队列可容纳任务数上限 */

    int shutdown;                       /* 标志位,线程池使用状态,true或false */
};

void *threadpool_thread(void *threadpool);

void *adjust_thread(void *threadpool);

int is_thread_alive(pthread_t tid);
int threadpool_free(threadpool_t *pool);

//threadpool_create(3,100,100);  
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{
    int i;
    threadpool_t *pool = NULL;          /* 线程池 结构体 */

    do {
        if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {  
            printf("malloc threadpool fail");
            break;                                      /*跳出do while*/
        }

        pool->min_thr_num = min_thr_num;
        pool->max_thr_num = max_thr_num;
        pool->busy_thr_num = 0;
        pool->live_thr_num = min_thr_num;               /* 活着的线程数 初值=最小线程数 */
        pool->wait_exit_thr_num = 0;
        pool->queue_size = 0;                           /* 有0个产品 */
        pool->queue_max_size = queue_max_size;          /* 最大任务队列数 */
        pool->queue_front = 0;
        pool->queue_rear = 0;
        pool->shutdown = false;                         /* 不关闭线程池 */

        /* 根据最大线程上限数, 给工作线程数组开辟空间, 并清零 */
        pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); 
        if (pool->threads == NULL) {
            printf("malloc threads fail");
            break;
        }
        memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);

        /* 给 任务队列 开辟空间 */
        pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
        if (pool->task_queue == NULL) {
            printf("malloc task_queue fail");
            break;
        }

        /* 初始化互斥琐、条件变量 */
        if (pthread_mutex_init(&(pool->lock), NULL) != 0
                || pthread_mutex_init(&(pool->thread_counter), NULL) != 0
                || pthread_cond_init(&(pool->queue_not_empty), NULL) != 0
                || pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
        {
            printf("init the lock or cond fail");
            break;
        }

        /* 启动 min_thr_num 个 work thread */
        for (i = 0; i < min_thr_num; i++) {
            pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);   /*pool指向当前线程池*/
            printf("start thread 0x%x...\n", (unsigned int)pool->threads[i]);
        }
        pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool);     /* 创建管理者线程 */

        return pool;

    } while (0);

    threadpool_free(pool);      /* 前面代码调用失败时,释放pool存储空间 */

    return NULL;
}

/* 向线程池中 添加一个任务 */
//threadpool_add(thp, process, (void*)&num[i]);   /* 向线程池中添加任务 process: 小写---->大写*/

int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
{
    pthread_mutex_lock(&(pool->lock));

    /* ==为真,队列已经满, 调wait阻塞 */
    while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {
        pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
    }

    if (pool->shutdown) {
        pthread_cond_broadcast(&(pool->queue_not_empty));
        pthread_mutex_unlock(&(pool->lock));
        return 0;
    }

    /* 清空 工作线程 调用的回调函数 的参数arg */
    if (pool->task_queue[pool->queue_rear].arg != NULL) {
        pool->task_queue[pool->queue_rear].arg = NULL;
    }

    /*添加任务到任务队列里*/
    pool->task_queue[pool->queue_rear].function = function;
    pool->task_queue[pool->queue_rear].arg = arg;
    pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size;       /* 队尾指针移动, 模拟环形 */
    pool->queue_size++;

    /*添加完任务后,队列不为空,唤醒线程池中 等待处理任务的线程*/
    pthread_cond_signal(&(pool->queue_not_empty));
    pthread_mutex_unlock(&(pool->lock));

    return 0;
}

/* 线程池中各个工作线程 */
void *threadpool_thread(void *threadpool)
{
    threadpool_t *pool = (threadpool_t *)threadpool;
    threadpool_task_t task;

    while (true) {
        /* Lock must be taken to wait on conditional variable */
        /*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/
        pthread_mutex_lock(&(pool->lock));

        /*queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上, 若有任务,跳过该while*/
        while ((pool->queue_size == 0) && (!pool->shutdown)) {  
            printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());
            pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));

            /*清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程*/
            if (pool->wait_exit_thr_num > 0) {
                pool->wait_exit_thr_num--;

                /*如果线程池里线程个数大于最小值时可以结束当前线程*/
                if (pool->live_thr_num > pool->min_thr_num) {
                    printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
                    pool->live_thr_num--;
                    pthread_mutex_unlock(&(pool->lock));

                    pthread_exit(NULL);
                }
            }
        }

        /*如果指定了true,要关闭线程池里的每个线程,自行退出处理---销毁线程池*/
        if (pool->shutdown) {
            pthread_mutex_unlock(&(pool->lock));
            printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
            pthread_detach(pthread_self());
            pthread_exit(NULL);     /* 线程自行结束 */
        }

        /*从任务队列里获取任务, 是一个出队操作*/
        task.function = pool->task_queue[pool->queue_front].function;
        task.arg = pool->task_queue[pool->queue_front].arg;

        pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;       /* 出队,模拟环形队列 */
        pool->queue_size--;

        /*通知可以有新的任务添加进来*/
        pthread_cond_broadcast(&(pool->queue_not_full));

        /*任务取出后,立即将 线程池琐 释放*/
        pthread_mutex_unlock(&(pool->lock));

        /*执行任务*/ 
        printf("thread 0x%x start working\n", (unsigned int)pthread_self());
        pthread_mutex_lock(&(pool->thread_counter));                            /*忙状态线程数变量琐*/
        pool->busy_thr_num++;                                                   /*忙状态线程数+1*/
        pthread_mutex_unlock(&(pool->thread_counter));

        (*(task.function))(task.arg);                                           /*执行回调函数任务*/
        //task.function(task.arg);                                              /*执行回调函数任务*/

        /*任务结束处理*/ 
        printf("thread 0x%x end working\n", (unsigned int)pthread_self());
        pthread_mutex_lock(&(pool->thread_counter));
        pool->busy_thr_num--;                                       /*处理掉一个任务,忙状态数线程数-1*/
        pthread_mutex_unlock(&(pool->thread_counter));
    }

    pthread_exit(NULL);
}

/* 管理线程 */
void *adjust_thread(void *threadpool)
{
    int i;
    threadpool_t *pool = (threadpool_t *)threadpool;
    while (!pool->shutdown) {

        sleep(DEFAULT_TIME);                                    /*定时 对线程池管理*/

        pthread_mutex_lock(&(pool->lock));
        int queue_size = pool->queue_size;                      /* 关注 任务数 */
        int live_thr_num = pool->live_thr_num;                  /* 存活 线程数 */
        pthread_mutex_unlock(&(pool->lock));

        pthread_mutex_lock(&(pool->thread_counter));
        int busy_thr_num = pool->busy_thr_num;                  /* 忙着的线程数 */
        pthread_mutex_unlock(&(pool->thread_counter));

        /* 创建新线程 算法: 任务数大于最小线程池个数, 且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/
        if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) {
            pthread_mutex_lock(&(pool->lock));  
            int add = 0;

            /*一次增加 DEFAULT_THREAD 个线程*/
            for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY
                    && pool->live_thr_num < pool->max_thr_num; i++) {
                if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) {
                    pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
                    add++;
                    pool->live_thr_num++;
                }
            }

            pthread_mutex_unlock(&(pool->lock));
        }

        /* 销毁多余的空闲线程 算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/
        if ((busy_thr_num * 2) < live_thr_num  &&  live_thr_num > pool->min_thr_num) {

            /* 一次销毁DEFAULT_THREAD个线程, 隨機10個即可 */
            pthread_mutex_lock(&(pool->lock));
            pool->wait_exit_thr_num = DEFAULT_THREAD_VARY;      /* 要销毁的线程数 设置为10 */
            pthread_mutex_unlock(&(pool->lock));

            for (i = 0; i < DEFAULT_THREAD_VARY; i++) {
                /* 通知处在空闲状态的线程, 他们会自行终止*/
                pthread_cond_signal(&(pool->queue_not_empty));
            }
        }
    }

    return NULL;
}

int threadpool_destroy(threadpool_t *pool)
{
    int i;
    if (pool == NULL) {
        return -1;
    }
    pool->shutdown = true;

    /*先销毁管理线程*/
    pthread_join(pool->adjust_tid, NULL);

    for (i = 0; i < pool->live_thr_num; i++) {
        /*通知所有的空闲线程*/
        pthread_cond_broadcast(&(pool->queue_not_empty));
    }
    for (i = 0; i < pool->live_thr_num; i++) {
        pthread_join(pool->threads[i], NULL);
    }
    threadpool_free(pool);

    return 0;
}

int threadpool_free(threadpool_t *pool)
{
    if (pool == NULL) {
        return -1;
    }

    if (pool->task_queue) {
        free(pool->task_queue);
    }
    if (pool->threads) {
        free(pool->threads);
        pthread_mutex_lock(&(pool->lock));
        pthread_mutex_destroy(&(pool->lock));
        pthread_mutex_lock(&(pool->thread_counter));
        pthread_mutex_destroy(&(pool->thread_counter));
        pthread_cond_destroy(&(pool->queue_not_empty));
        pthread_cond_destroy(&(pool->queue_not_full));
    }
    free(pool);
    pool = NULL;

    return 0;
}

int threadpool_all_threadnum(threadpool_t *pool)
{
    int all_threadnum = -1;                 // 总线程数

    pthread_mutex_lock(&(pool->lock));
    all_threadnum = pool->live_thr_num;     // 存活线程数
    pthread_mutex_unlock(&(pool->lock));

    return all_threadnum;
}

int threadpool_busy_threadnum(threadpool_t *pool)
{
    int busy_threadnum = -1;                // 忙线程数

    pthread_mutex_lock(&(pool->thread_counter));
    busy_threadnum = pool->busy_thr_num;    
    pthread_mutex_unlock(&(pool->thread_counter));

    return busy_threadnum;
}

int is_thread_alive(pthread_t tid)
{
    int kill_rc = pthread_kill(tid, 0);     //发0号信号,测试线程是否存活
    if (kill_rc == ESRCH) {
        return false;
    }
    return true;
}

/*测试*/ 

#if 1

/* 线程池中的线程,模拟处理业务 */
void *process(void *arg)
{
    printf("thread 0x%x working on task %d\n ",(unsigned int)pthread_self(),(int)arg);
    sleep(1);                           //模拟 小---大写
    printf("task %d is end\n",(int)arg);

    return NULL;
}

int main(void)
{
    /*threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);*/

    threadpool_t *thp = threadpool_create(3,100,100);   /*创建线程池,池里最小3个线程,最大100,队列最大100*/
    printf("pool inited");

    //int *num = (int *)malloc(sizeof(int)*20);
    int num[20], i;
    for (i = 0; i < 20; i++) {
        num[i] = i;
        printf("add task %d\n",i);
        
        /*int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg) */

        threadpool_add(thp, process, (void*)&num[i]);   /* 向线程池中添加任务 */
    }

    sleep(10);                                          /* 等子线程完成任务 */
    threadpool_destroy(thp);

    return 0;
}

#endif

C/S模型-UDP

TCP/UDP通信对比

TCP:面向连接的,可靠数据包传输。对于不稳定的网络层,采取完全弥补的通信方式(丢包重传)

  • 优点
    • 数据流量稳定
    • 速度稳定
    • 按序
  • 缺点
    • 传输速度慢
    • 效率低
    • 开销大
  • 使用场景:数据的完整型要求较高,不追求效率(大数据传输、文件传输)


UDP:无连接的,不可靠的数据报传递。对于不稳定的网络层,采取完全不弥补的通信方式(默认还原网络状况)

  • 优点
    • 传输速度块
    • 效率高
    • 开销小
  • 缺点
    • 数据流量不稳定
    • 速度不稳定
    • 不按序
  • 使用场景:对时效性要求较高,稳定性其次(游戏、视频电话)

ps:QQ、微信等大型项目追求传输速度,在传输层采用UDP协议,但会在应用层补充数据校验协议,弥补UDP的不足

UDP实现的 C/S 模型

①因为UDP不用建立连接,因此服务器舍弃accpet()和connect()函数

②read()和write()函数也被舍弃,且recv()和send()函数只能用于TCP通信,因此使用recvfrom()和sendto()函数替换

③不用设计并发,UDP实现的默认就是并发服务器,因为不需要建立连接了

相关函数

ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags,struct sockaddr *src_addr, socklen_t *addrlen); 接收数据,同时获取对端地址结构,相当于TCP中的read() + accept()

参数:

  • sockfd:套接字
  • buf:缓冲区地址
  • len:缓冲区大小
  • flags:0
  • src_addr:传出参数,对端地址结构(如果不关心对端地址结构,一般是客户端调用该函数时,该参数传NULL)
  • addrlen:传入传出参数(因此是指针类型),地址结构大小(如果不关心对端地址结构,该参数传0)

返回值:

  • 成功:接收数据字节数
  • 失败:-1,errno
  • 0:说明对端关闭(跟read函数一样)

ssize_t sendto(int sockfd, const void *buf, size_t len, int flags,const struct sockaddr *dest_addr, socklen_t addrlen); 发送数据,相当于TCP中的write() + connect()

参数:

  • sockfd: 套接字
  • buf:存储数据的缓冲区
  • len:数据长度
  • flags:0
  • src_addr:传入参数,目标地址结构
  • addrlen:地址结构长度

返回值:

  • 成功:成功写出数据字节数
  • 失败:-1,errno

思路分析

server:

伪代码 {
    lfd = socket(AF_INET, SOCK_DGRAM, 0); //注意第二个参数变了,改为以UDP为代表的报式协议
    bind();
    listen(); //可有可无(因为不需要三次握手了,也就不需要设置连接上限了)

    while(1){
        recvfrom()  
        //数据处理
        sendto() 
    }
    close();
}

client:

伪代码 {
    connect_fd = socket(AF_INET, SOCK_DGRAM, 0);
    sendto();
    recvfrom();
    写到屏幕
    close();
}

代码实现

server:

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<sys/socket.h>
#include<ctype.h>
#include<arpa/inet.h>

#define SERV_PORT 9527

int main(int argc, char *argv[]) {
    int sockfd = 0;
    int ret, i;
    char buf[BUFSIZ], client_IP[1024];

    struct sockaddr_in serv_addr, clit_addr;
    socklen_t clit_addr_len;

    bzero(&serv_addr, sizeof(serv_addr));
    serv_addr.sin_family =  AF_INET;
    serv_addr.sin_port = htons(SERV_PORT);
    serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);

    sockfd = socket(AF_INET, SOCK_DGRAM, 0);

    bind(sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));

    clit_addr_len = sizeof(clit_addr);

    while(1) {
        ret = recvfrom(sockfd, buf, sizeof(buf), 0, (struct sockaddr *)&clit_addr, &clit_addr_len);
        //输出查看客户端地址结构
        printf("client ip:%s port:%d\n",
                    inet_ntop(AF_INET, &clit_addr.sin_addr.s_addr, client_IP, sizeof(client_IP)),
                    ntohs(clit_addr.sin_port));
        for(i = 0; i < ret; i++) {
            buf[i] = toupper(buf[i]);
        }
        sendto(sockfd, buf, ret, 0, (struct sockaddr *)&clit_addr, clit_addr_len);
    }

    close(sockfd);

    return 0;
}

client:

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<sys/socket.h>
#include<ctype.h>
#include<arpa/inet.h>

#define SERV_PORT 9527
#define SERV_IP "127.0.0.1"

int main(int argc, char *argv[]) {
    int sockfd = 0;
    int ret;
    char buf[BUFSIZ];
    struct sockaddr_in serv_addr;

    bzero(&serv_addr, sizeof(serv_addr));
    serv_addr.sin_family =  AF_INET; //指定服务器的地址结构
    serv_addr.sin_port = htons(SERV_PORT);
    inet_pton(AF_INET, SERV_IP, &serv_addr.sin_addr.s_addr);

    sockfd = socket(AF_INET, SOCK_DGRAM, 0);

    while(1) {
        fgets(buf, sizeof(buf), stdin); //从屏幕输入要让服务器处理的数据
        sendto(sockfd, buf, strlen(buf), 0, (struct sockaddr *)&serv_addr, sizeof(serv_addr)); //strlen代表从屏幕写入数据后缓冲区实际的大小,而不是容量
        ret = recvfrom(sockfd, buf, sizeof(buf), 0, NULL, 0); //用不到对端服务器的地址结构了,传出参数传NULL
        write(STDOUT_FILENO, buf, ret);
    }

    close(sockfd);

    return 0;
}

socket IPC(本地套接字domin)

概论

IPC方法: pipe、fifo、mmap、信号、本地套接字(domain)

对比网络编程TCP C/S模型, 注意以下几点:

  1. int socket(int domain, int type, int protocol);
    • domain:AF_INET改为AF_UNIX/AF_LOCAL
    • type:SOCK_STREAM或SOCK_DGRAM都可以
    • protocol:依然默认传0
  2. 地址结构类型由so ckaddr_in改为sockaddr_un,依然是结构体,但只有两个成员变量
    • struct sockaddr_in srv_addr; → struct sockaddr_un srv_adrr;
      • srv_addr.sin_family = AF_INET; → srv_addr.sun_family = AF_UNIX;
      • srv_addr.sin_port = htons(8888);srv_addr.sin_addr.s_addr = htonl(INADDR_ANY); → strcpy(srv_addr.sun_path, SERV_ADDR)
  3. bind()函数的第三个参数不能用sizeof()
    • len = offsetof(struct sockaddr_un, sun_path) + strlen(srvaddr.sun_path); //sockaddr_un中16位的sun_family+路径文件名sun_path大小
      • offsetof函数作用是求结构体struct sockaddr_un中第二个成员变量sun_path偏移了多少,也就是求第一个成员变量sun_family的大小
      • 既然知道sun_family大小是16位,为什么不把offsetof(struct sockaddr_un, sun_path)直接写成2呢?因为不让,那样就成硬编码了
    • bind(fd, (struct sockaddr *)&srv_addr, sizeof(srv_addr)); → bind(fd, (struct sockaddr *)&srv_addr, len);
  4. 在本地套接字中,bind()函数调用成功后会创建一个socket,因此为保证bind()成功,通常我们在bind()之前,可以使用unlink(SERV_ADDR)将路径文件名删除,确保在bind()前serv.sock文件不存在
  5. 客户端不能依赖“隐式绑定”,因此在通信建立过程中,需要创建并初始化两个地址结构:①client_addr用于bind()生成套接字 ②server_addr用于connect()连接通信

代码实现

server

#include <stdio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <strings.h>
#include <string.h>
#include <ctype.h>
#include <arpa/inet.h>
#include <sys/un.h>
#include <stddef.h>

#include "wrap.h"

#define SERV_ADDR  "serv.socket"

int main(void) {
    int lfd, cfd, len, size, i;
    struct sockaddr_un servaddr, cliaddr;
    char buf[4096];

    lfd = Socket(AF_UNIX, SOCK_STREAM, 0);

    bzero(&servaddr, sizeof(servaddr));
    servaddr.sun_family = AF_UNIX;
    strcpy(servaddr.sun_path, SERV_ADDR);

    len = offsetof(struct sockaddr_un, sun_path) + strlen(servaddr.sun_path);     /* servaddr total len */

    unlink(SERV_ADDR);                              /* 确保bind之前serv.sock文件不存在,bind会创建该文件 */
    Bind(lfd, (struct sockaddr *)&servaddr, len);           /* 参3不能是sizeof(servaddr) */

    Listen(lfd, 20);

    printf("Accept ...\n");
    while (1) {
        len = sizeof(cliaddr);  //AF_UNIX大小+108B

        cfd = Accept(lfd, (struct sockaddr *)&cliaddr, (socklen_t *)&len);

        len -= offsetof(struct sockaddr_un, sun_path);      /* 得到文件名的长度 */
        cliaddr.sun_path[len] = '\0';                       /* 确保打印时,没有乱码出现 */

        printf("client bind filename %s\n", cliaddr.sun_path);

        while ((size = read(cfd, buf, sizeof(buf))) > 0) {
            for (i = 0; i < size; i++)
                buf[i] = toupper(buf[i]);
            write(cfd, buf, size);
        }
        close(cfd);
    }
    close(lfd);

    return 0;
}

client

#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>         
#include <sys/socket.h>
#include <strings.h>
#include <string.h>
#include <ctype.h>
#include <arpa/inet.h>
#include <sys/un.h>
#include <stddef.h>

#include "wrap.h"

#define SERV_ADDR "serv.socket"
#define CLIE_ADDR "clie.socket"

int main(void) {
    int  cfd, len;
    struct sockaddr_un servaddr, cliaddr;
    char buf[4096];

    cfd = Socket(AF_UNIX, SOCK_STREAM, 0);

    bzero(&cliaddr, sizeof(cliaddr));
    cliaddr.sun_family = AF_UNIX;
    strcpy(cliaddr.sun_path,CLIE_ADDR);

    len = offsetof(struct sockaddr_un, sun_path) + strlen(cliaddr.sun_path);     /* 计算客户端地址结构有效长度 */

    unlink(CLIE_ADDR);
    Bind(cfd, (struct sockaddr *)&cliaddr, len);                                 /* 客户端也需要bind, 不能依赖自动绑定*/

    
    bzero(&servaddr, sizeof(servaddr));                                          /* 构造server 地址 */
    servaddr.sun_family = AF_UNIX;
    strcpy(servaddr.sun_path, SERV_ADDR);

    len = offsetof(struct sockaddr_un, sun_path) + strlen(servaddr.sun_path);   /* 计算服务器端地址结构有效长度 */

    Connect(cfd, (struct sockaddr *)&servaddr, len);

    while (fgets(buf, sizeof(buf), stdin) != NULL) {
        write(cfd, buf, strlen(buf));
        len = read(cfd, buf, sizeof(buf));
        write(STDOUT_FILENO, buf, len);
    }

    close(cfd);

    return 0;
}


不准投币喔 👆

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇