[Date Prev][Date Next] [Thread Prev][Thread Next] [Date Index] [Thread Index]

Bug#943667: buster-pu: package python-oslo.messaging/8.1.3-1 -> 8.1.4-1+deb10u1



Package: release.debian.org
Severity: normal
Tags: buster
User: release.debian.org@packages.debian.org
Usertags: pu

Dear Stable Release team,

I'd like to upgrade oslo.messaging to version 8.1.4-1+deb10u1. Indeed,
in versin 8.1.3, when a Rabbitmq server configured through transport_url
dies (or is turned off because of maintenance), the OpenStack clients,
which means, all services running on compute hosts, would attempt to
reconnect to the same RabbitMQ host. As a consequence, upgrading would
be *very* problematic, as the node wouldn't reconnect to another node
when we're upgrading one rabbit node (and as a consequence, turning off
the service there).

Attached is the debdiff for this change,
Please allow me to upgrade oslo.messaging in Buster,
Cheers,

Thomas Goirand (zigo)
diff -Nru python-oslo.messaging-8.1.3/debian/changelog python-oslo.messaging-8.1.4/debian/changelog
--- python-oslo.messaging-8.1.3/debian/changelog	2019-05-17 14:33:29.000000000 +0200
+++ python-oslo.messaging-8.1.4/debian/changelog	2019-10-27 18:01:18.000000000 +0100
@@ -1,3 +1,10 @@
+python-oslo.messaging (8.1.4-1+deb10u1) buster; urgency=medium
+
+  * New upstream point release, with an important fix:
+    - Fix switch connection destination when a rabbitmq cluster node disappear.
+
+ -- Thomas Goirand <zigo@debian.org>  Sun, 27 Oct 2019 18:01:18 +0100
+
 python-oslo.messaging (8.1.3-1) unstable; urgency=medium
 
   * New upstream point release, which includes this fix:
diff -Nru python-oslo.messaging-8.1.3/doc/requirements.txt python-oslo.messaging-8.1.4/doc/requirements.txt
--- python-oslo.messaging-8.1.3/doc/requirements.txt	2019-04-23 09:50:41.000000000 +0200
+++ python-oslo.messaging-8.1.4/doc/requirements.txt	2019-08-09 21:54:57.000000000 +0200
@@ -3,7 +3,8 @@
 # process, which may cause wedges in the gate later.
 
 openstackdocstheme>=1.18.1 # Apache-2.0
-sphinx!=1.6.6,!=1.6.7,>=1.6.2 # BSD
+sphinx!=1.6.6,!=1.6.7,>=1.6.2,<2.0.0;python_version=='2.7' # BSD
+sphinx!=1.6.6,!=1.6.7,>=1.6.2;python_version>='3.4' # BSD
 reno>=2.5.0 # Apache-2.0
 
 # imported when the source code is parsed for generating documentation:
diff -Nru python-oslo.messaging-8.1.3/oslo_messaging/_drivers/amqpdriver.py python-oslo.messaging-8.1.4/oslo_messaging/_drivers/amqpdriver.py
--- python-oslo.messaging-8.1.3/oslo_messaging/_drivers/amqpdriver.py	2019-04-23 09:50:41.000000000 +0200
+++ python-oslo.messaging-8.1.4/oslo_messaging/_drivers/amqpdriver.py	2019-08-09 21:54:57.000000000 +0200
@@ -167,8 +167,34 @@
                                      'duration': duration})
                     return
 
+    def heartbeat(self):
+        with self.listener.driver._get_connection(
+                rpc_common.PURPOSE_SEND) as conn:
+            self._send_reply(conn, None, None, ending=False)
+
+    # NOTE(sileht): Those have already be ack in RpcListener IO thread
+    # We keep them as noop until all drivers do the same
     def acknowledge(self):
-        self._message_operations_handler.do(self.message.acknowledge)
+        pass
+
+    def requeue(self):
+        pass
+
+
+class NotificationAMQPIncomingMessage(AMQPIncomingMessage):
+    def acknowledge(self):
+        def _do_ack():
+            try:
+                self.message.acknowledge()
+            except Exception as exc:
+                # NOTE(kgiusti): this failure is likely due to a loss of the
+                # connection to the broker.  Not much we can do in this case,
+                # especially considering the Notification has already been
+                # dispatched. This *could* result in message duplication
+                # (unacked msg is returned to the queue by the broker), but the
+                # driver tries to catch that using the msg_id_cache.
+                LOG.warning("Failed to acknowledge received message: %s", exc)
+        self._message_operations_handler.do(_do_ack)
         self.listener.msg_id_cache.add(self.unique_id)
 
     def requeue(self):
@@ -178,12 +204,12 @@
         # msg_id_cache, the message will be reconsumed, the only difference is
         # the message stay at the beginning of the queue instead of moving to
         # the end.
-        self._message_operations_handler.do(self.message.requeue)
-
-    def heartbeat(self):
-        with self.listener.driver._get_connection(
-                rpc_common.PURPOSE_SEND) as conn:
-            self._send_reply(conn, None, None, ending=False)
+        def _do_requeue():
+            try:
+                self.message.requeue()
+            except Exception as exc:
+                LOG.warning("Failed to requeue received message: %s", exc)
+        self._message_operations_handler.do(_do_requeue)
 
 
 class ObsoleteReplyQueuesCache(object):
@@ -256,7 +282,7 @@
         else:
             LOG.debug("received message with unique_id: %s", unique_id)
 
-        self.incoming.append(AMQPIncomingMessage(
+        self.incoming.append(self.message_cls(
             self,
             ctxt.to_dict(),
             message,
@@ -319,6 +345,41 @@
         self.conn.close()
 
 
+class RpcAMQPListener(AMQPListener):
+    message_cls = AMQPIncomingMessage
+
+    def __call__(self, message):
+        # NOTE(kgiusti): In the original RPC implementation the RPC server
+        # would acknowledge the request THEN process it.  The goal of this was
+        # to prevent duplication if the ack failed.  Should the ack fail the
+        # request would be discarded since the broker would not remove the
+        # request from the queue since no ack was received.  That would lead to
+        # the request being redelivered at some point. However this approach
+        # meant that the ack was issued from the dispatch thread, not the
+        # consumer thread, which is bad since kombu is not thread safe.  So a
+        # change was made to schedule the ack to be sent on the consumer thread
+        # - breaking the ability to catch ack errors before dispatching the
+        # request.  To fix this we do the actual ack here in the consumer
+        # callback and avoid the upcall if the ack fails.  See
+        # https://bugs.launchpad.net/oslo.messaging/+bug/1695746
+        # for all the gory details...
+        try:
+            message.acknowledge()
+        except Exception as exc:
+            LOG.warning("Discarding RPC request due to failed acknowlege: %s",
+                        exc)
+        else:
+            # NOTE(kgiusti): be aware that even if the acknowledge call
+            # succeeds there is no guarantee the broker actually gets the ACK
+            # since acknowledge() simply writes the ACK to the socket (there is
+            # no ACK confirmation coming back from the broker)
+            super(RpcAMQPListener, self).__call__(message)
+
+
+class NotificationAMQPListener(AMQPListener):
+    message_cls = NotificationAMQPIncomingMessage
+
+
 class ReplyWaiters(object):
 
     WAKE_UP = object()
@@ -590,7 +651,7 @@
     def listen(self, target, batch_size, batch_timeout):
         conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
 
-        listener = AMQPListener(self, conn)
+        listener = RpcAMQPListener(self, conn)
 
         conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
                                     topic=target.topic,
@@ -608,7 +669,7 @@
                                  batch_size, batch_timeout):
         conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
 
-        listener = AMQPListener(self, conn)
+        listener = NotificationAMQPListener(self, conn)
         for target, priority in targets_and_priorities:
             conn.declare_topic_consumer(
                 exchange_name=self._get_exchange(target),
diff -Nru python-oslo.messaging-8.1.3/oslo_messaging/_drivers/common.py python-oslo.messaging-8.1.4/oslo_messaging/_drivers/common.py
--- python-oslo.messaging-8.1.3/oslo_messaging/_drivers/common.py	2019-04-23 09:50:41.000000000 +0200
+++ python-oslo.messaging-8.1.4/oslo_messaging/_drivers/common.py	2019-08-09 21:54:57.000000000 +0200
@@ -373,7 +373,7 @@
 # greenthread.
 # So, a connection cannot be shared between thread/greenthread and
 # this two variables permit to define the purpose of the connection
-# to allow drivers to add special handling if needed (like heatbeat).
+# to allow drivers to add special handling if needed (like heartbeat).
 # amqp drivers create 3 kind of connections:
 # * driver.listen*(): each call create a new 'PURPOSE_LISTEN' connection
 # * driver.send*(): a pool of 'PURPOSE_SEND' connections is used
diff -Nru python-oslo.messaging-8.1.3/oslo_messaging/_drivers/impl_rabbit.py python-oslo.messaging-8.1.4/oslo_messaging/_drivers/impl_rabbit.py
--- python-oslo.messaging-8.1.3/oslo_messaging/_drivers/impl_rabbit.py	2019-04-23 09:50:41.000000000 +0200
+++ python-oslo.messaging-8.1.4/oslo_messaging/_drivers/impl_rabbit.py	2019-08-09 21:54:57.000000000 +0200
@@ -610,7 +610,7 @@
         # expected waiting the events drain, we start heartbeat_check and
         # retrieve the server heartbeat packet only two times more than
         # the minimum required for the heartbeat works
-        # (heatbeat_timeout/heartbeat_rate/2.0, default kombu
+        # (heartbeat_timeout/heartbeat_rate/2.0, default kombu
         # heartbeat_rate is 2)
         self._heartbeat_wait_timeout = (
             float(self.heartbeat_timeout_threshold) /
@@ -635,7 +635,7 @@
 
         # NOTE(sileht): value chosen according the best practice from kombu
         # http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
-        # For heatbeat, we can set a bigger timeout, and check we receive the
+        # For heartbeat, we can set a bigger timeout, and check we receive the
         # heartbeat packets regulary
         if self._heartbeat_supported_and_enabled():
             self._poll_timeout = self._heartbeat_wait_timeout
@@ -974,6 +974,14 @@
     def _heartbeat_thread_job(self):
         """Thread that maintains inactive connections
         """
+        # NOTE(hberaud): Python2 doesn't have ConnectionRefusedError
+        # defined so to switch connections destination on failure
+        # with python2 and python3 we need to wrapp adapt connection refused
+        try:
+            ConnectRefuseError = ConnectionRefusedError
+        except NameError:
+            ConnectRefuseError = socket.error
+
         while not self._heartbeat_exit_event.is_set():
             with self._connection_lock.for_heartbeat():
 
@@ -990,14 +998,24 @@
                             self.connection.drain_events(timeout=0.001)
                         except socket.timeout:
                             pass
+                    # NOTE(hberaud): In a clustered rabbitmq when
+                    # a node disappears, we get a ConnectionRefusedError
+                    # because the socket get disconnected.
+                    # The socket access yields a OSError because the heartbeat
+                    # tries to reach an unreachable host (No route to host).
+                    # Catch these exceptions to ensure that we call
+                    # ensure_connection for switching the
+                    # connection destination.
                     except (socket.timeout,
+                            ConnectRefuseError,
+                            OSError,
                             kombu.exceptions.OperationalError) as exc:
                         LOG.info(_LI("A recoverable connection/channel error "
                                      "occurred, trying to reconnect: %s"), exc)
                         self.ensure_connection()
                 except Exception:
-                    LOG.warning(_LW("Unexpected error during heartbeart "
-                                    "thread processing, retrying..."))
+                    LOG.warning(_LW("Unexpected error during heartbeat "
+                                "thread processing, retrying..."))
                     LOG.debug('Exception', exc_info=True)
 
             self._heartbeat_exit_event.wait(
diff -Nru python-oslo.messaging-8.1.3/oslo_messaging/rpc/server.py python-oslo.messaging-8.1.4/oslo_messaging/rpc/server.py
--- python-oslo.messaging-8.1.3/oslo_messaging/rpc/server.py	2019-04-23 09:50:41.000000000 +0200
+++ python-oslo.messaging-8.1.4/oslo_messaging/rpc/server.py	2019-08-09 21:54:57.000000000 +0200
@@ -152,6 +152,9 @@
 
     def _process_incoming(self, incoming):
         message = incoming[0]
+
+        # TODO(sileht): We should remove that at some point and do
+        # this directly in the driver
         try:
             message.acknowledge()
         except Exception:
diff -Nru python-oslo.messaging-8.1.3/test-requirements.txt python-oslo.messaging-8.1.4/test-requirements.txt
--- python-oslo.messaging-8.1.3/test-requirements.txt	2019-04-23 09:50:41.000000000 +0200
+++ python-oslo.messaging-8.1.4/test-requirements.txt	2019-08-09 21:54:57.000000000 +0200
@@ -31,7 +31,7 @@
 pyngus>=2.2.0 # Apache-2.0
 
 # Bandit security code scanner
-bandit>=1.1.0 # Apache-2.0
+bandit>=1.1.0,<1.6.0 # Apache-2.0
 
 eventlet!=0.18.3,!=0.20.1,>=0.18.2 # MIT
 greenlet>=0.4.10 # MIT

Reply to: