zeromq 示例

使用 zeromq 进行线程间通信的示例,示例中使用管道模型,使用的是 inproc 类型的套接字

#include "zhelpers.h"
#include <pthread.h>
#define NBR_CLIENTS 2
#define NBR_WORKERS 3


static void *
client_task(void *context)
{
    //  Socket to send messages on
    void *sender = zmq_socket (context, ZMQ_PUSH);
    zmq_connect (sender, "inproc://jtt808-msg-frontendpoint");
    

    while(1)
    {
        char msg [100];
        sprintf (msg, "%lu", (long unsigned int) pthread_self());
        
        zmq_msg_t message;
        zmq_msg_init_size (&message, strlen(msg));
        memcpy (zmq_msg_data (&message), msg, strlen(msg));
        zmq_msg_send (&message, sender, 0);
    }

    zmq_close (sender);

    return NULL;
}

static void *
worker_task(void *context)
{
    //  Socket to receive messages on
    void *receiver = zmq_socket (context, ZMQ_PULL);
    zmq_connect (receiver, "inproc://jtt808-msg-backendpoint");
    

    char msg [100];
    while (1) {
        memset(msg, 0, 100);
        
        zmq_msg_t message;
        zmq_msg_init (&message);
        zmq_msg_recv (&message, receiver, 0);       
        memcpy(msg, zmq_msg_data(&message), zmq_msg_size(&message));
        zmq_msg_close(&message);

        printf ("worker task get msg, msg = [%s], thread_id = [%lu]rn", msg, (long unsigned int) pthread_self());    
    }
    
    zmq_close (receiver);
    
    return NULL;
}

int main(void)
{

    void *context = zmq_ctx_new();

    void *frontend = zmq_socket (context, ZMQ_PULL);
    void *backend  = zmq_socket (context, ZMQ_PUSH);
    zmq_bind (frontend, "inproc://jtt808-msg-frontendpoint");
    zmq_bind (backend,  "inproc://jtt808-msg-backendpoint");
    
    
    int client_nbr;
    for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++) {
        pthread_t client;
        pthread_create(&client, NULL, client_task, context);
    }

    int worker_nbr;
    for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
        pthread_t worker;
        pthread_create(&worker, NULL, worker_task, context);
    }
    
    zmq_proxy (frontend, backend, NULL);
    

    
    //  We never get here but clean up anyhow
    zmq_close (frontend);
    zmq_close (backend);
    zmq_ctx_destroy (context);
    
    return 0;
}