
使用 zeromq 进行线程间通信的示例,示例中使用管道模型,使用的是 inproc 类型的套接字
#include "zhelpers.h"
#include <pthread.h>
#define NBR_CLIENTS 2
#define NBR_WORKERS 3
static void *
client_task(void *context)
{
// Socket to send messages on
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sender, "inproc://jtt808-msg-frontendpoint");
while(1)
{
char msg [100];
sprintf (msg, "%lu", (long unsigned int) pthread_self());
zmq_msg_t message;
zmq_msg_init_size (&message, strlen(msg));
memcpy (zmq_msg_data (&message), msg, strlen(msg));
zmq_msg_send (&message, sender, 0);
}
zmq_close (sender);
return NULL;
}
static void *
worker_task(void *context)
{
// Socket to receive messages on
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "inproc://jtt808-msg-backendpoint");
char msg [100];
while (1) {
memset(msg, 0, 100);
zmq_msg_t message;
zmq_msg_init (&message);
zmq_msg_recv (&message, receiver, 0);
memcpy(msg, zmq_msg_data(&message), zmq_msg_size(&message));
zmq_msg_close(&message);
printf ("worker task get msg, msg = [%s], thread_id = [%lu]rn", msg, (long unsigned int) pthread_self());
}
zmq_close (receiver);
return NULL;
}
int main(void)
{
void *context = zmq_ctx_new();
void *frontend = zmq_socket (context, ZMQ_PULL);
void *backend = zmq_socket (context, ZMQ_PUSH);
zmq_bind (frontend, "inproc://jtt808-msg-frontendpoint");
zmq_bind (backend, "inproc://jtt808-msg-backendpoint");
int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++) {
pthread_t client;
pthread_create(&client, NULL, client_task, context);
}
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
pthread_t worker;
pthread_create(&worker, NULL, worker_task, context);
}
zmq_proxy (frontend, backend, NULL);
// We never get here but clean up anyhow
zmq_close (frontend);
zmq_close (backend);
zmq_ctx_destroy (context);
return 0;
}




近期评论