Bug#943667: buster-pu: package python-oslo.messaging/8.1.3-1 -> 8.1.4-1+deb10u1
Package: release.debian.org
Severity: normal
Tags: buster
User: release.debian.org@packages.debian.org
Usertags: pu
Dear Stable Release team,
I'd like to upgrade oslo.messaging to version 8.1.4-1+deb10u1. Indeed,
in versin 8.1.3, when a Rabbitmq server configured through transport_url
dies (or is turned off because of maintenance), the OpenStack clients,
which means, all services running on compute hosts, would attempt to
reconnect to the same RabbitMQ host. As a consequence, upgrading would
be *very* problematic, as the node wouldn't reconnect to another node
when we're upgrading one rabbit node (and as a consequence, turning off
the service there).
Attached is the debdiff for this change,
Please allow me to upgrade oslo.messaging in Buster,
Cheers,
Thomas Goirand (zigo)
diff -Nru python-oslo.messaging-8.1.3/debian/changelog python-oslo.messaging-8.1.4/debian/changelog
--- python-oslo.messaging-8.1.3/debian/changelog 2019-05-17 14:33:29.000000000 +0200
+++ python-oslo.messaging-8.1.4/debian/changelog 2019-10-27 18:01:18.000000000 +0100
@@ -1,3 +1,10 @@
+python-oslo.messaging (8.1.4-1+deb10u1) buster; urgency=medium
+
+ * New upstream point release, with an important fix:
+ - Fix switch connection destination when a rabbitmq cluster node disappear.
+
+ -- Thomas Goirand <zigo@debian.org> Sun, 27 Oct 2019 18:01:18 +0100
+
python-oslo.messaging (8.1.3-1) unstable; urgency=medium
* New upstream point release, which includes this fix:
diff -Nru python-oslo.messaging-8.1.3/doc/requirements.txt python-oslo.messaging-8.1.4/doc/requirements.txt
--- python-oslo.messaging-8.1.3/doc/requirements.txt 2019-04-23 09:50:41.000000000 +0200
+++ python-oslo.messaging-8.1.4/doc/requirements.txt 2019-08-09 21:54:57.000000000 +0200
@@ -3,7 +3,8 @@
# process, which may cause wedges in the gate later.
openstackdocstheme>=1.18.1 # Apache-2.0
-sphinx!=1.6.6,!=1.6.7,>=1.6.2 # BSD
+sphinx!=1.6.6,!=1.6.7,>=1.6.2,<2.0.0;python_version=='2.7' # BSD
+sphinx!=1.6.6,!=1.6.7,>=1.6.2;python_version>='3.4' # BSD
reno>=2.5.0 # Apache-2.0
# imported when the source code is parsed for generating documentation:
diff -Nru python-oslo.messaging-8.1.3/oslo_messaging/_drivers/amqpdriver.py python-oslo.messaging-8.1.4/oslo_messaging/_drivers/amqpdriver.py
--- python-oslo.messaging-8.1.3/oslo_messaging/_drivers/amqpdriver.py 2019-04-23 09:50:41.000000000 +0200
+++ python-oslo.messaging-8.1.4/oslo_messaging/_drivers/amqpdriver.py 2019-08-09 21:54:57.000000000 +0200
@@ -167,8 +167,34 @@
'duration': duration})
return
+ def heartbeat(self):
+ with self.listener.driver._get_connection(
+ rpc_common.PURPOSE_SEND) as conn:
+ self._send_reply(conn, None, None, ending=False)
+
+ # NOTE(sileht): Those have already be ack in RpcListener IO thread
+ # We keep them as noop until all drivers do the same
def acknowledge(self):
- self._message_operations_handler.do(self.message.acknowledge)
+ pass
+
+ def requeue(self):
+ pass
+
+
+class NotificationAMQPIncomingMessage(AMQPIncomingMessage):
+ def acknowledge(self):
+ def _do_ack():
+ try:
+ self.message.acknowledge()
+ except Exception as exc:
+ # NOTE(kgiusti): this failure is likely due to a loss of the
+ # connection to the broker. Not much we can do in this case,
+ # especially considering the Notification has already been
+ # dispatched. This *could* result in message duplication
+ # (unacked msg is returned to the queue by the broker), but the
+ # driver tries to catch that using the msg_id_cache.
+ LOG.warning("Failed to acknowledge received message: %s", exc)
+ self._message_operations_handler.do(_do_ack)
self.listener.msg_id_cache.add(self.unique_id)
def requeue(self):
@@ -178,12 +204,12 @@
# msg_id_cache, the message will be reconsumed, the only difference is
# the message stay at the beginning of the queue instead of moving to
# the end.
- self._message_operations_handler.do(self.message.requeue)
-
- def heartbeat(self):
- with self.listener.driver._get_connection(
- rpc_common.PURPOSE_SEND) as conn:
- self._send_reply(conn, None, None, ending=False)
+ def _do_requeue():
+ try:
+ self.message.requeue()
+ except Exception as exc:
+ LOG.warning("Failed to requeue received message: %s", exc)
+ self._message_operations_handler.do(_do_requeue)
class ObsoleteReplyQueuesCache(object):
@@ -256,7 +282,7 @@
else:
LOG.debug("received message with unique_id: %s", unique_id)
- self.incoming.append(AMQPIncomingMessage(
+ self.incoming.append(self.message_cls(
self,
ctxt.to_dict(),
message,
@@ -319,6 +345,41 @@
self.conn.close()
+class RpcAMQPListener(AMQPListener):
+ message_cls = AMQPIncomingMessage
+
+ def __call__(self, message):
+ # NOTE(kgiusti): In the original RPC implementation the RPC server
+ # would acknowledge the request THEN process it. The goal of this was
+ # to prevent duplication if the ack failed. Should the ack fail the
+ # request would be discarded since the broker would not remove the
+ # request from the queue since no ack was received. That would lead to
+ # the request being redelivered at some point. However this approach
+ # meant that the ack was issued from the dispatch thread, not the
+ # consumer thread, which is bad since kombu is not thread safe. So a
+ # change was made to schedule the ack to be sent on the consumer thread
+ # - breaking the ability to catch ack errors before dispatching the
+ # request. To fix this we do the actual ack here in the consumer
+ # callback and avoid the upcall if the ack fails. See
+ # https://bugs.launchpad.net/oslo.messaging/+bug/1695746
+ # for all the gory details...
+ try:
+ message.acknowledge()
+ except Exception as exc:
+ LOG.warning("Discarding RPC request due to failed acknowlege: %s",
+ exc)
+ else:
+ # NOTE(kgiusti): be aware that even if the acknowledge call
+ # succeeds there is no guarantee the broker actually gets the ACK
+ # since acknowledge() simply writes the ACK to the socket (there is
+ # no ACK confirmation coming back from the broker)
+ super(RpcAMQPListener, self).__call__(message)
+
+
+class NotificationAMQPListener(AMQPListener):
+ message_cls = NotificationAMQPIncomingMessage
+
+
class ReplyWaiters(object):
WAKE_UP = object()
@@ -590,7 +651,7 @@
def listen(self, target, batch_size, batch_timeout):
conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
- listener = AMQPListener(self, conn)
+ listener = RpcAMQPListener(self, conn)
conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
topic=target.topic,
@@ -608,7 +669,7 @@
batch_size, batch_timeout):
conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
- listener = AMQPListener(self, conn)
+ listener = NotificationAMQPListener(self, conn)
for target, priority in targets_and_priorities:
conn.declare_topic_consumer(
exchange_name=self._get_exchange(target),
diff -Nru python-oslo.messaging-8.1.3/oslo_messaging/_drivers/common.py python-oslo.messaging-8.1.4/oslo_messaging/_drivers/common.py
--- python-oslo.messaging-8.1.3/oslo_messaging/_drivers/common.py 2019-04-23 09:50:41.000000000 +0200
+++ python-oslo.messaging-8.1.4/oslo_messaging/_drivers/common.py 2019-08-09 21:54:57.000000000 +0200
@@ -373,7 +373,7 @@
# greenthread.
# So, a connection cannot be shared between thread/greenthread and
# this two variables permit to define the purpose of the connection
-# to allow drivers to add special handling if needed (like heatbeat).
+# to allow drivers to add special handling if needed (like heartbeat).
# amqp drivers create 3 kind of connections:
# * driver.listen*(): each call create a new 'PURPOSE_LISTEN' connection
# * driver.send*(): a pool of 'PURPOSE_SEND' connections is used
diff -Nru python-oslo.messaging-8.1.3/oslo_messaging/_drivers/impl_rabbit.py python-oslo.messaging-8.1.4/oslo_messaging/_drivers/impl_rabbit.py
--- python-oslo.messaging-8.1.3/oslo_messaging/_drivers/impl_rabbit.py 2019-04-23 09:50:41.000000000 +0200
+++ python-oslo.messaging-8.1.4/oslo_messaging/_drivers/impl_rabbit.py 2019-08-09 21:54:57.000000000 +0200
@@ -610,7 +610,7 @@
# expected waiting the events drain, we start heartbeat_check and
# retrieve the server heartbeat packet only two times more than
# the minimum required for the heartbeat works
- # (heatbeat_timeout/heartbeat_rate/2.0, default kombu
+ # (heartbeat_timeout/heartbeat_rate/2.0, default kombu
# heartbeat_rate is 2)
self._heartbeat_wait_timeout = (
float(self.heartbeat_timeout_threshold) /
@@ -635,7 +635,7 @@
# NOTE(sileht): value chosen according the best practice from kombu
# http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
- # For heatbeat, we can set a bigger timeout, and check we receive the
+ # For heartbeat, we can set a bigger timeout, and check we receive the
# heartbeat packets regulary
if self._heartbeat_supported_and_enabled():
self._poll_timeout = self._heartbeat_wait_timeout
@@ -974,6 +974,14 @@
def _heartbeat_thread_job(self):
"""Thread that maintains inactive connections
"""
+ # NOTE(hberaud): Python2 doesn't have ConnectionRefusedError
+ # defined so to switch connections destination on failure
+ # with python2 and python3 we need to wrapp adapt connection refused
+ try:
+ ConnectRefuseError = ConnectionRefusedError
+ except NameError:
+ ConnectRefuseError = socket.error
+
while not self._heartbeat_exit_event.is_set():
with self._connection_lock.for_heartbeat():
@@ -990,14 +998,24 @@
self.connection.drain_events(timeout=0.001)
except socket.timeout:
pass
+ # NOTE(hberaud): In a clustered rabbitmq when
+ # a node disappears, we get a ConnectionRefusedError
+ # because the socket get disconnected.
+ # The socket access yields a OSError because the heartbeat
+ # tries to reach an unreachable host (No route to host).
+ # Catch these exceptions to ensure that we call
+ # ensure_connection for switching the
+ # connection destination.
except (socket.timeout,
+ ConnectRefuseError,
+ OSError,
kombu.exceptions.OperationalError) as exc:
LOG.info(_LI("A recoverable connection/channel error "
"occurred, trying to reconnect: %s"), exc)
self.ensure_connection()
except Exception:
- LOG.warning(_LW("Unexpected error during heartbeart "
- "thread processing, retrying..."))
+ LOG.warning(_LW("Unexpected error during heartbeat "
+ "thread processing, retrying..."))
LOG.debug('Exception', exc_info=True)
self._heartbeat_exit_event.wait(
diff -Nru python-oslo.messaging-8.1.3/oslo_messaging/rpc/server.py python-oslo.messaging-8.1.4/oslo_messaging/rpc/server.py
--- python-oslo.messaging-8.1.3/oslo_messaging/rpc/server.py 2019-04-23 09:50:41.000000000 +0200
+++ python-oslo.messaging-8.1.4/oslo_messaging/rpc/server.py 2019-08-09 21:54:57.000000000 +0200
@@ -152,6 +152,9 @@
def _process_incoming(self, incoming):
message = incoming[0]
+
+ # TODO(sileht): We should remove that at some point and do
+ # this directly in the driver
try:
message.acknowledge()
except Exception:
diff -Nru python-oslo.messaging-8.1.3/test-requirements.txt python-oslo.messaging-8.1.4/test-requirements.txt
--- python-oslo.messaging-8.1.3/test-requirements.txt 2019-04-23 09:50:41.000000000 +0200
+++ python-oslo.messaging-8.1.4/test-requirements.txt 2019-08-09 21:54:57.000000000 +0200
@@ -31,7 +31,7 @@
pyngus>=2.2.0 # Apache-2.0
# Bandit security code scanner
-bandit>=1.1.0 # Apache-2.0
+bandit>=1.1.0,<1.6.0 # Apache-2.0
eventlet!=0.18.3,!=0.20.1,>=0.18.2 # MIT
greenlet>=0.4.10 # MIT
Reply to: