[Date Prev][Date Next] [Thread Prev][Thread Next] [Date Index] [Thread Index]

Bug#943667: marked as done (buster-pu: package python-oslo.messaging/8.1.3-1 -> 8.1.4-1+deb10u1)



Your message dated Sat, 16 Nov 2019 10:08:47 +0000
with message-id <83c9ffab6f08361485f70dda4733a7a24aeec09b.camel@adam-barratt.org.uk>
and subject line Closing bugs for 10.2 point release fixes
has caused the Debian Bug report #943667,
regarding buster-pu: package python-oslo.messaging/8.1.3-1 -> 8.1.4-1+deb10u1
to be marked as done.

This means that you claim that the problem has been dealt with.
If this is not the case it is now your responsibility to reopen the
Bug report if necessary, and/or fix the problem forthwith.

(NB: If you are a system administrator and have no idea what this
message is talking about, this may indicate a serious mail system
misconfiguration somewhere. Please contact owner@bugs.debian.org
immediately.)


-- 
943667: https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=943667
Debian Bug Tracking System
Contact owner@bugs.debian.org with problems
--- Begin Message ---
Package: release.debian.org
Severity: normal
Tags: buster
User: release.debian.org@packages.debian.org
Usertags: pu

Dear Stable Release team,

I'd like to upgrade oslo.messaging to version 8.1.4-1+deb10u1. Indeed,
in versin 8.1.3, when a Rabbitmq server configured through transport_url
dies (or is turned off because of maintenance), the OpenStack clients,
which means, all services running on compute hosts, would attempt to
reconnect to the same RabbitMQ host. As a consequence, upgrading would
be *very* problematic, as the node wouldn't reconnect to another node
when we're upgrading one rabbit node (and as a consequence, turning off
the service there).

Attached is the debdiff for this change,
Please allow me to upgrade oslo.messaging in Buster,
Cheers,

Thomas Goirand (zigo)
diff -Nru python-oslo.messaging-8.1.3/debian/changelog python-oslo.messaging-8.1.4/debian/changelog
--- python-oslo.messaging-8.1.3/debian/changelog	2019-05-17 14:33:29.000000000 +0200
+++ python-oslo.messaging-8.1.4/debian/changelog	2019-10-27 18:01:18.000000000 +0100
@@ -1,3 +1,10 @@
+python-oslo.messaging (8.1.4-1+deb10u1) buster; urgency=medium
+
+  * New upstream point release, with an important fix:
+    - Fix switch connection destination when a rabbitmq cluster node disappear.
+
+ -- Thomas Goirand <zigo@debian.org>  Sun, 27 Oct 2019 18:01:18 +0100
+
 python-oslo.messaging (8.1.3-1) unstable; urgency=medium
 
   * New upstream point release, which includes this fix:
diff -Nru python-oslo.messaging-8.1.3/doc/requirements.txt python-oslo.messaging-8.1.4/doc/requirements.txt
--- python-oslo.messaging-8.1.3/doc/requirements.txt	2019-04-23 09:50:41.000000000 +0200
+++ python-oslo.messaging-8.1.4/doc/requirements.txt	2019-08-09 21:54:57.000000000 +0200
@@ -3,7 +3,8 @@
 # process, which may cause wedges in the gate later.
 
 openstackdocstheme>=1.18.1 # Apache-2.0
-sphinx!=1.6.6,!=1.6.7,>=1.6.2 # BSD
+sphinx!=1.6.6,!=1.6.7,>=1.6.2,<2.0.0;python_version=='2.7' # BSD
+sphinx!=1.6.6,!=1.6.7,>=1.6.2;python_version>='3.4' # BSD
 reno>=2.5.0 # Apache-2.0
 
 # imported when the source code is parsed for generating documentation:
diff -Nru python-oslo.messaging-8.1.3/oslo_messaging/_drivers/amqpdriver.py python-oslo.messaging-8.1.4/oslo_messaging/_drivers/amqpdriver.py
--- python-oslo.messaging-8.1.3/oslo_messaging/_drivers/amqpdriver.py	2019-04-23 09:50:41.000000000 +0200
+++ python-oslo.messaging-8.1.4/oslo_messaging/_drivers/amqpdriver.py	2019-08-09 21:54:57.000000000 +0200
@@ -167,8 +167,34 @@
                                      'duration': duration})
                     return
 
+    def heartbeat(self):
+        with self.listener.driver._get_connection(
+                rpc_common.PURPOSE_SEND) as conn:
+            self._send_reply(conn, None, None, ending=False)
+
+    # NOTE(sileht): Those have already be ack in RpcListener IO thread
+    # We keep them as noop until all drivers do the same
     def acknowledge(self):
-        self._message_operations_handler.do(self.message.acknowledge)
+        pass
+
+    def requeue(self):
+        pass
+
+
+class NotificationAMQPIncomingMessage(AMQPIncomingMessage):
+    def acknowledge(self):
+        def _do_ack():
+            try:
+                self.message.acknowledge()
+            except Exception as exc:
+                # NOTE(kgiusti): this failure is likely due to a loss of the
+                # connection to the broker.  Not much we can do in this case,
+                # especially considering the Notification has already been
+                # dispatched. This *could* result in message duplication
+                # (unacked msg is returned to the queue by the broker), but the
+                # driver tries to catch that using the msg_id_cache.
+                LOG.warning("Failed to acknowledge received message: %s", exc)
+        self._message_operations_handler.do(_do_ack)
         self.listener.msg_id_cache.add(self.unique_id)
 
     def requeue(self):
@@ -178,12 +204,12 @@
         # msg_id_cache, the message will be reconsumed, the only difference is
         # the message stay at the beginning of the queue instead of moving to
         # the end.
-        self._message_operations_handler.do(self.message.requeue)
-
-    def heartbeat(self):
-        with self.listener.driver._get_connection(
-                rpc_common.PURPOSE_SEND) as conn:
-            self._send_reply(conn, None, None, ending=False)
+        def _do_requeue():
+            try:
+                self.message.requeue()
+            except Exception as exc:
+                LOG.warning("Failed to requeue received message: %s", exc)
+        self._message_operations_handler.do(_do_requeue)
 
 
 class ObsoleteReplyQueuesCache(object):
@@ -256,7 +282,7 @@
         else:
             LOG.debug("received message with unique_id: %s", unique_id)
 
-        self.incoming.append(AMQPIncomingMessage(
+        self.incoming.append(self.message_cls(
             self,
             ctxt.to_dict(),
             message,
@@ -319,6 +345,41 @@
         self.conn.close()
 
 
+class RpcAMQPListener(AMQPListener):
+    message_cls = AMQPIncomingMessage
+
+    def __call__(self, message):
+        # NOTE(kgiusti): In the original RPC implementation the RPC server
+        # would acknowledge the request THEN process it.  The goal of this was
+        # to prevent duplication if the ack failed.  Should the ack fail the
+        # request would be discarded since the broker would not remove the
+        # request from the queue since no ack was received.  That would lead to
+        # the request being redelivered at some point. However this approach
+        # meant that the ack was issued from the dispatch thread, not the
+        # consumer thread, which is bad since kombu is not thread safe.  So a
+        # change was made to schedule the ack to be sent on the consumer thread
+        # - breaking the ability to catch ack errors before dispatching the
+        # request.  To fix this we do the actual ack here in the consumer
+        # callback and avoid the upcall if the ack fails.  See
+        # https://bugs.launchpad.net/oslo.messaging/+bug/1695746
+        # for all the gory details...
+        try:
+            message.acknowledge()
+        except Exception as exc:
+            LOG.warning("Discarding RPC request due to failed acknowlege: %s",
+                        exc)
+        else:
+            # NOTE(kgiusti): be aware that even if the acknowledge call
+            # succeeds there is no guarantee the broker actually gets the ACK
+            # since acknowledge() simply writes the ACK to the socket (there is
+            # no ACK confirmation coming back from the broker)
+            super(RpcAMQPListener, self).__call__(message)
+
+
+class NotificationAMQPListener(AMQPListener):
+    message_cls = NotificationAMQPIncomingMessage
+
+
 class ReplyWaiters(object):
 
     WAKE_UP = object()
@@ -590,7 +651,7 @@
     def listen(self, target, batch_size, batch_timeout):
         conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
 
-        listener = AMQPListener(self, conn)
+        listener = RpcAMQPListener(self, conn)
 
         conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
                                     topic=target.topic,
@@ -608,7 +669,7 @@
                                  batch_size, batch_timeout):
         conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
 
-        listener = AMQPListener(self, conn)
+        listener = NotificationAMQPListener(self, conn)
         for target, priority in targets_and_priorities:
             conn.declare_topic_consumer(
                 exchange_name=self._get_exchange(target),
diff -Nru python-oslo.messaging-8.1.3/oslo_messaging/_drivers/common.py python-oslo.messaging-8.1.4/oslo_messaging/_drivers/common.py
--- python-oslo.messaging-8.1.3/oslo_messaging/_drivers/common.py	2019-04-23 09:50:41.000000000 +0200
+++ python-oslo.messaging-8.1.4/oslo_messaging/_drivers/common.py	2019-08-09 21:54:57.000000000 +0200
@@ -373,7 +373,7 @@
 # greenthread.
 # So, a connection cannot be shared between thread/greenthread and
 # this two variables permit to define the purpose of the connection
-# to allow drivers to add special handling if needed (like heatbeat).
+# to allow drivers to add special handling if needed (like heartbeat).
 # amqp drivers create 3 kind of connections:
 # * driver.listen*(): each call create a new 'PURPOSE_LISTEN' connection
 # * driver.send*(): a pool of 'PURPOSE_SEND' connections is used
diff -Nru python-oslo.messaging-8.1.3/oslo_messaging/_drivers/impl_rabbit.py python-oslo.messaging-8.1.4/oslo_messaging/_drivers/impl_rabbit.py
--- python-oslo.messaging-8.1.3/oslo_messaging/_drivers/impl_rabbit.py	2019-04-23 09:50:41.000000000 +0200
+++ python-oslo.messaging-8.1.4/oslo_messaging/_drivers/impl_rabbit.py	2019-08-09 21:54:57.000000000 +0200
@@ -610,7 +610,7 @@
         # expected waiting the events drain, we start heartbeat_check and
         # retrieve the server heartbeat packet only two times more than
         # the minimum required for the heartbeat works
-        # (heatbeat_timeout/heartbeat_rate/2.0, default kombu
+        # (heartbeat_timeout/heartbeat_rate/2.0, default kombu
         # heartbeat_rate is 2)
         self._heartbeat_wait_timeout = (
             float(self.heartbeat_timeout_threshold) /
@@ -635,7 +635,7 @@
 
         # NOTE(sileht): value chosen according the best practice from kombu
         # http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
-        # For heatbeat, we can set a bigger timeout, and check we receive the
+        # For heartbeat, we can set a bigger timeout, and check we receive the
         # heartbeat packets regulary
         if self._heartbeat_supported_and_enabled():
             self._poll_timeout = self._heartbeat_wait_timeout
@@ -974,6 +974,14 @@
     def _heartbeat_thread_job(self):
         """Thread that maintains inactive connections
         """
+        # NOTE(hberaud): Python2 doesn't have ConnectionRefusedError
+        # defined so to switch connections destination on failure
+        # with python2 and python3 we need to wrapp adapt connection refused
+        try:
+            ConnectRefuseError = ConnectionRefusedError
+        except NameError:
+            ConnectRefuseError = socket.error
+
         while not self._heartbeat_exit_event.is_set():
             with self._connection_lock.for_heartbeat():
 
@@ -990,14 +998,24 @@
                             self.connection.drain_events(timeout=0.001)
                         except socket.timeout:
                             pass
+                    # NOTE(hberaud): In a clustered rabbitmq when
+                    # a node disappears, we get a ConnectionRefusedError
+                    # because the socket get disconnected.
+                    # The socket access yields a OSError because the heartbeat
+                    # tries to reach an unreachable host (No route to host).
+                    # Catch these exceptions to ensure that we call
+                    # ensure_connection for switching the
+                    # connection destination.
                     except (socket.timeout,
+                            ConnectRefuseError,
+                            OSError,
                             kombu.exceptions.OperationalError) as exc:
                         LOG.info(_LI("A recoverable connection/channel error "
                                      "occurred, trying to reconnect: %s"), exc)
                         self.ensure_connection()
                 except Exception:
-                    LOG.warning(_LW("Unexpected error during heartbeart "
-                                    "thread processing, retrying..."))
+                    LOG.warning(_LW("Unexpected error during heartbeat "
+                                "thread processing, retrying..."))
                     LOG.debug('Exception', exc_info=True)
 
             self._heartbeat_exit_event.wait(
diff -Nru python-oslo.messaging-8.1.3/oslo_messaging/rpc/server.py python-oslo.messaging-8.1.4/oslo_messaging/rpc/server.py
--- python-oslo.messaging-8.1.3/oslo_messaging/rpc/server.py	2019-04-23 09:50:41.000000000 +0200
+++ python-oslo.messaging-8.1.4/oslo_messaging/rpc/server.py	2019-08-09 21:54:57.000000000 +0200
@@ -152,6 +152,9 @@
 
     def _process_incoming(self, incoming):
         message = incoming[0]
+
+        # TODO(sileht): We should remove that at some point and do
+        # this directly in the driver
         try:
             message.acknowledge()
         except Exception:
diff -Nru python-oslo.messaging-8.1.3/test-requirements.txt python-oslo.messaging-8.1.4/test-requirements.txt
--- python-oslo.messaging-8.1.3/test-requirements.txt	2019-04-23 09:50:41.000000000 +0200
+++ python-oslo.messaging-8.1.4/test-requirements.txt	2019-08-09 21:54:57.000000000 +0200
@@ -31,7 +31,7 @@
 pyngus>=2.2.0 # Apache-2.0
 
 # Bandit security code scanner
-bandit>=1.1.0 # Apache-2.0
+bandit>=1.1.0,<1.6.0 # Apache-2.0
 
 eventlet!=0.18.3,!=0.20.1,>=0.18.2 # MIT
 greenlet>=0.4.10 # MIT

--- End Message ---
--- Begin Message ---
Package: release.debian.org
Version: 10.2

Hi,

The fixes referenced by these bugs were included in today's 10.2 stable
point release.

Regards,

Adam

--- End Message ---

Reply to: