From c2d58e525ef494b38197f50371ea4d7e8a6e6c47 Mon Sep 17 00:00:00 2001 From: Benoit Pierre Date: Sat, 17 Aug 2024 15:02:56 +0000 Subject: [PATCH] czmq & libzmq: update to 4.2.1 & 4.3.5 respectively (#12350) --- base | 2 +- frontend/ui/message/filemessagequeue.lua | 2 +- frontend/ui/message/messagequeue.lua | 2 +- frontend/ui/message/streammessagequeue.lua | 31 ++++++++++--------- .../ui/message/streammessagequeueserver.lua | 31 ++++++++++--------- 5 files changed, 35 insertions(+), 33 deletions(-) diff --git a/base b/base index d4e4cae2e..99ddf05e3 160000 --- a/base +++ b/base @@ -1 +1 @@ -Subproject commit d4e4cae2ec1d89d02ce9036a5404c8f8dc8baf50 +Subproject commit 99ddf05e3657c9f6c31d53a05023a9d598e8d8ec diff --git a/frontend/ui/message/filemessagequeue.lua b/frontend/ui/message/filemessagequeue.lua index de9c67f3f..136db9a3f 100644 --- a/frontend/ui/message/filemessagequeue.lua +++ b/frontend/ui/message/filemessagequeue.lua @@ -2,7 +2,7 @@ local ffi = require("ffi") local logger = require("logger") local MessageQueue = require("ui/message/messagequeue") local _ = require("ffi/zeromq_h") -local czmq = ffi.load("libs/libczmq.so.1") +local czmq = ffi.load("libs/libczmq.so.4") local filemq = ffi.load("libs/libfmq.so.1") local FileMessageQueue = MessageQueue:extend{ diff --git a/frontend/ui/message/messagequeue.lua b/frontend/ui/message/messagequeue.lua index da06b87eb..e8bf3fd15 100644 --- a/frontend/ui/message/messagequeue.lua +++ b/frontend/ui/message/messagequeue.lua @@ -3,7 +3,7 @@ local Event = require("ui/event") local logger = require("logger") local _ = require("ffi/zeromq_h") -local czmq = ffi.load("libs/libczmq.so.1") +local czmq = ffi.load("libs/libczmq.so.4") local MessageQueue = {} diff --git a/frontend/ui/message/streammessagequeue.lua b/frontend/ui/message/streammessagequeue.lua index 8560ac4fd..2d0b15ddc 100644 --- a/frontend/ui/message/streammessagequeue.lua +++ b/frontend/ui/message/streammessagequeue.lua @@ -3,8 +3,8 @@ local logger = require("logger") local MessageQueue = require("ui/message/messagequeue") local _ = require("ffi/zeromq_h") -local zmq = ffi.load("libs/libzmq.so.4") -local czmq = ffi.load("libs/libczmq.so.1") +local zmq = ffi.load("libs/libzmq.so.5") +local czmq = ffi.load("libs/libczmq.so.4") local C = ffi.C local StreamMessageQueue = MessageQueue:extend{ @@ -13,21 +13,25 @@ local StreamMessageQueue = MessageQueue:extend{ } function StreamMessageQueue:start() - self.context = czmq.zctx_new() - self.socket = czmq.zsocket_new(self.context, C.ZMQ_STREAM) - self.poller = czmq.zpoller_new(self.socket, nil) local endpoint = string.format("tcp://%s:%d", self.host, self.port) + self.socket = czmq.zsock_new(C.ZMQ_STREAM) + if not self.socket then + error("cannot create socket for endpoint " .. endpoint) + end logger.dbg("connecting to endpoint", endpoint) - local rc = czmq.zsocket_connect(self.socket, endpoint) - if rc ~= 0 then + if czmq.zsock_connect(self.socket, endpoint) ~= 0 then error("cannot connect to " .. endpoint) end - local id_size = ffi.new("size_t[1]", 256) + local id_size = ffi.new("size_t[1]", 255) local buffer = ffi.new("uint8_t[?]", id_size[0]) - --- @todo: Check return of zmq_getsockopt() - zmq.zmq_getsockopt(self.socket, C.ZMQ_IDENTITY, buffer, id_size) + if zmq.zmq_getsockopt(czmq.zsock_resolve(self.socket), C.ZMQ_IDENTITY, buffer, id_size) ~= 0 then + error("cannot get socket identity for endpoint " .. endpoint) + end self.id = ffi.string(buffer, id_size[0]) - logger.dbg("id", #self.id, self.id) + self.poller = czmq.zpoller_new(self.socket, nil) + if not self.poller then + error("cannot create poller for endpoint " .. endpoint) + end end function StreamMessageQueue:stop() @@ -35,10 +39,7 @@ function StreamMessageQueue:stop() czmq.zpoller_destroy(ffi.new('zpoller_t *[1]', self.poller)) end if self.socket ~= nil then - czmq.zsocket_destroy(self.context, self.socket) - end - if self.context ~= nil then - czmq.zctx_destroy(ffi.new('zctx_t *[1]', self.context)) + czmq.zsock_destroy(ffi.new('zsock_t *[1]', self.socket)) end end diff --git a/frontend/ui/message/streammessagequeueserver.lua b/frontend/ui/message/streammessagequeueserver.lua index a55a38f5e..e5496aa47 100644 --- a/frontend/ui/message/streammessagequeueserver.lua +++ b/frontend/ui/message/streammessagequeueserver.lua @@ -3,7 +3,8 @@ local logger = require("logger") local MessageQueue = require("ui/message/messagequeue") local _ = require("ffi/zeromq_h") -local czmq = ffi.load("libs/libczmq.so.1") +local zmq = ffi.load("libs/libzmq.so.5") +local czmq = ffi.load("libs/libczmq.so.4") local C = ffi.C local StreamMessageQueueServer = MessageQueue:extend{ @@ -12,15 +13,18 @@ local StreamMessageQueueServer = MessageQueue:extend{ } function StreamMessageQueueServer:start() - self.context = czmq.zctx_new() - self.socket = czmq.zsocket_new(self.context, C.ZMQ_STREAM) - self.poller = czmq.zpoller_new(self.socket, nil) local endpoint = string.format("tcp://%s:%d", self.host, self.port) - logger.dbg("StreamMessageQueueServer: Binding to endpoint", endpoint) - local rc = czmq.zsocket_bind(self.socket, endpoint) - -- If success, rc is port number - if rc == -1 then - logger.err("StreamMessageQueueServer: Cannot bind to ", endpoint) + self.socket = czmq.zsock_new(C.ZMQ_STREAM) + if not self.socket then + error("cannot create socket for endpoint " .. endpoint) + end + logger.dbg("binding to endpoint", endpoint) + if czmq.zsock_bind(self.socket, endpoint) == -1 then + error("cannot bind to " .. endpoint) + end + self.poller = czmq.zpoller_new(self.socket, nil) + if not self.poller then + error("cannot create poller for endpoint " .. endpoint) end end @@ -29,10 +33,7 @@ function StreamMessageQueueServer:stop() czmq.zpoller_destroy(ffi.new('zpoller_t *[1]', self.poller)) end if self.socket ~= nil then - czmq.zsocket_destroy(self.context, self.socket) - end - if self.context ~= nil then - czmq.zctx_destroy(ffi.new('zctx_t *[1]', self.context)) + czmq.zsock_destroy(ffi.new('zsock_t *[1]', self.socket)) end end @@ -74,13 +75,13 @@ end function StreamMessageQueueServer:send(data, id_frame) czmq.zframe_send(ffi.new('zframe_t *[1]', id_frame), self.socket, C.ZFRAME_MORE + C.ZFRAME_REUSE) - czmq.zmq_send(self.socket, ffi.cast("unsigned char*", data), #data, C.ZFRAME_MORE) + zmq.zmq_send(czmq.zsock_resolve(self.socket), ffi.cast("unsigned char*", data), #data, C.ZFRAME_MORE) -- Note: We can't use czmq.zstr_send(self.socket, data), which would stop on the first -- null byte in data (Lua strings can have null bytes inside). -- Close connection czmq.zframe_send(ffi.new('zframe_t *[1]', id_frame), self.socket, C.ZFRAME_MORE) - czmq.zmq_send(self.socket, nil, 0, 0) + zmq.zmq_send(czmq.zsock_resolve(self.socket), nil, 0, 0) end return StreamMessageQueueServer