[Date Prev][Date Next] [Thread Prev][Thread Next] [Date Index] [Thread Index]

Bug#1064276: marked as done (bookworm-pu: package python-channels-redis/4.0.0-1+deb12u1)



Your message dated Sat, 29 Jun 2024 10:46:16 +0000
with message-id <E1sNVay-002bZO-5e@coccia.debian.org>
and subject line Released with 12.6
has caused the Debian Bug report #1064276,
regarding bookworm-pu: package python-channels-redis/4.0.0-1+deb12u1
to be marked as done.

This means that you claim that the problem has been dealt with.
If this is not the case it is now your responsibility to reopen the
Bug report if necessary, and/or fix the problem forthwith.

(NB: If you are a system administrator and have no idea what this
message is talking about, this may indicate a serious mail system
misconfiguration somewhere. Please contact owner@bugs.debian.org
immediately.)


-- 
1064276: https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=1064276
Debian Bug Tracking System
Contact owner@bugs.debian.org with problems
--- Begin Message ---
Package: release.debian.org
Severity: normal
Tags: bookworm
User: release.debian.org@packages.debian.org
Usertags: pu
X-Debbugs-Cc: python-channels-redis@packages.debian.org, Michael Fladischer <fladi@debian.org>
Control: affects -1 + src:python-channels-redis

[ Reason ]
The version of python-channels-redis in bookworm suffers from
https://bugs.debian.org/1027387 /
https://github.com/django/channels_redis/issues/332, which was
introduced in 4.0.0 and is a regression from bullseye.  I ran into this
while working on debusine.

[ Impact ]
I believe that any application that (a) runs Celery, (b) uses
channels-redis, and (c) sends to a channel via async_to_sync in a Celery
task without explicitly closing redis pools will encounter RuntimeErrors
as per the upstream bug report above.

I was able to work around this in my application
(https://salsa.debian.org/freexian-team/debusine/-/merge_requests/594),
but I lost a few hours to figuring this out and it would be good if
others didn't have to.

[ Tests ]
It's covered by the upstream test suite, which is run via autopkgtest.

I also confirmed that this change fixes the problems I ran into in
debusine, without applying any workarounds.

[ Risks ]
The alternative is letting applications work around this individually,
which I think on balance is probably worse - I initially tried switching
to RedisPubSubChannelLayer instead, which did more or less work but
required various other rather strange workarounds.

[ Checklist ]
  [x] *all* changes are documented in the d/changelog
  [x] I reviewed all changes and I approve them
  [x] attach debdiff against the package in (old)stable
  [x] the issue is verified as fixed in unstable

[ Changes ]
This is a straight cherry-pick of an upstream change to close redis
connection pools when closing the asyncio loop.

-- 
Colin Watson (he/him)                              [cjwatson@debian.org]
diff -Nru python-channels-redis-4.0.0/debian/changelog python-channels-redis-4.0.0/debian/changelog
--- python-channels-redis-4.0.0/debian/changelog	2022-10-10 21:13:47.000000000 +0100
+++ python-channels-redis-4.0.0/debian/changelog	2024-02-19 12:04:40.000000000 +0000
@@ -1,3 +1,11 @@
+python-channels-redis (4.0.0-1+deb12u1) UNRELEASED; urgency=medium
+
+  * Team upload.
+  * Cherry-pick from upstream:
+    - Ensure pools are closed on loop close in core (closes: #1027387).
+
+ -- Colin Watson <cjwatson@debian.org>  Mon, 19 Feb 2024 12:04:40 +0000
+
 python-channels-redis (4.0.0-1) unstable; urgency=medium
 
   * New upstream release.
diff -Nru python-channels-redis-4.0.0/debian/patches/loop-close-pools-in-core.patch python-channels-redis-4.0.0/debian/patches/loop-close-pools-in-core.patch
--- python-channels-redis-4.0.0/debian/patches/loop-close-pools-in-core.patch	1970-01-01 01:00:00.000000000 +0100
+++ python-channels-redis-4.0.0/debian/patches/loop-close-pools-in-core.patch	2024-02-19 12:04:34.000000000 +0000
@@ -0,0 +1,227 @@
+From: Devid <setti.davide89@gmail.com>
+Date: Tue, 28 Mar 2023 11:30:35 +0200
+Subject: Assured pools are closed on loop close in core (#347)
+
+Origin: upstream, https://github.com/django/channels_redis/pull/347
+Bug: https://github.com/django/channels_redis/issues/332
+Bug-Debian: https://bugs.debian.org/1027387
+Last-Update: 2024-02-19
+---
+ channels_redis/core.py   | 64 ++++++++++++++++++++++++++++--------------------
+ channels_redis/pubsub.py | 18 +-------------
+ channels_redis/utils.py  | 16 ++++++++++++
+ 3 files changed, 54 insertions(+), 44 deletions(-)
+
+diff --git a/channels_redis/core.py b/channels_redis/core.py
+index 7c04ecd..c3eb3b3 100644
+--- a/channels_redis/core.py
++++ b/channels_redis/core.py
+@@ -15,7 +15,7 @@ from redis import asyncio as aioredis
+ from channels.exceptions import ChannelFull
+ from channels.layers import BaseChannelLayer
+ 
+-from .utils import _consistent_hash
++from .utils import _consistent_hash, _wrap_close
+ 
+ logger = logging.getLogger(__name__)
+ 
+@@ -69,6 +69,26 @@ class BoundedQueue(asyncio.Queue):
+         return super(BoundedQueue, self).put_nowait(item)
+ 
+ 
++class RedisLoopLayer:
++    def __init__(self, channel_layer):
++        self._lock = asyncio.Lock()
++        self.channel_layer = channel_layer
++        self._connections = {}
++
++    def get_connection(self, index):
++        if index not in self._connections:
++            pool = self.channel_layer.create_pool(index)
++            self._connections[index] = aioredis.Redis(connection_pool=pool)
++
++        return self._connections[index]
++
++    async def flush(self):
++        async with self._lock:
++            for index in list(self._connections):
++                connection = self._connections.pop(index)
++                await connection.close(close_connection_pool=True)
++
++
+ class RedisChannelLayer(BaseChannelLayer):
+     """
+     Redis channel layer.
+@@ -101,8 +121,7 @@ class RedisChannelLayer(BaseChannelLayer):
+         self.hosts = self.decode_hosts(hosts)
+         self.ring_size = len(self.hosts)
+         # Cached redis connection pools and the event loop they are from
+-        self.pools = {}
+-        self.pools_loop = None
++        self._layers = {}
+         # Normal channels choose a host index by cycling through the available hosts
+         self._receive_index_generator = itertools.cycle(range(len(self.hosts)))
+         self._send_index_generator = itertools.cycle(range(len(self.hosts)))
+@@ -138,7 +157,7 @@ class RedisChannelLayer(BaseChannelLayer):
+             return aioredis.sentinel.SentinelConnectionPool(
+                 master_name,
+                 aioredis.sentinel.Sentinel(sentinels, sentinel_kwargs=sentinel_kwargs),
+-                **host
++                **host,
+             )
+         else:
+             return aioredis.ConnectionPool(**host)
+@@ -331,7 +350,7 @@ class RedisChannelLayer(BaseChannelLayer):
+ 
+                         raise
+ 
+-                    message, token, exception = None, None, None
++                    message = token = exception = None
+                     for task in done:
+                         try:
+                             result = task.result()
+@@ -367,7 +386,7 @@ class RedisChannelLayer(BaseChannelLayer):
+                             message_channel, message = await self.receive_single(
+                                 real_channel
+                             )
+-                            if type(message_channel) is list:
++                            if isinstance(message_channel, list):
+                                 for chan in message_channel:
+                                     self.receive_buffer[chan].put_nowait(message)
+                             else:
+@@ -459,11 +478,7 @@ class RedisChannelLayer(BaseChannelLayer):
+         Returns a new channel name that can be used by something in our
+         process as a specific channel.
+         """
+-        return "%s.%s!%s" % (
+-            prefix,
+-            self.client_prefix,
+-            uuid.uuid4().hex,
+-        )
++        return f"{prefix}.{self.client_prefix}!{uuid.uuid4().hex}"
+ 
+     ### Flush extension ###
+ 
+@@ -496,9 +511,8 @@ class RedisChannelLayer(BaseChannelLayer):
+         # Flush all cleaners, in case somebody just wanted to close the
+         # pools without flushing first.
+         await self.wait_received()
+-
+-        for index in self.pools:
+-            await self.pools[index].disconnect()
++        for layer in self._layers.values():
++            await layer.flush()
+ 
+     async def wait_received(self):
+         """
+@@ -667,7 +681,7 @@ class RedisChannelLayer(BaseChannelLayer):
+         """
+         Common function to make the storage key for the group.
+         """
+-        return ("%s:group:%s" % (self.prefix, group)).encode("utf8")
++        return f"{self.prefix}:group:{group}".encode("utf8")
+ 
+     ### Serialization ###
+ 
+@@ -711,7 +725,7 @@ class RedisChannelLayer(BaseChannelLayer):
+         return Fernet(formatted_key)
+ 
+     def __str__(self):
+-        return "%s(hosts=%s)" % (self.__class__.__name__, self.hosts)
++        return f"{self.__class__.__name__}(hosts={self.hosts})"
+ 
+     ### Connection handling ###
+ 
+@@ -723,18 +737,14 @@ class RedisChannelLayer(BaseChannelLayer):
+         # Catch bad indexes
+         if not 0 <= index < self.ring_size:
+             raise ValueError(
+-                "There are only %s hosts - you asked for %s!" % (self.ring_size, index)
++                f"There are only {self.ring_size} hosts - you asked for {index}!"
+             )
+ 
++        loop = asyncio.get_running_loop()
+         try:
+-            loop = asyncio.get_running_loop()
+-            if self.pools_loop != loop:
+-                self.pools = {}
+-                self.pools_loop = loop
+-        except RuntimeError:
+-            pass
+-
+-        if index not in self.pools:
+-            self.pools[index] = self.create_pool(index)
++            layer = self._layers[loop]
++        except KeyError:
++            _wrap_close(self, loop)
++            layer = self._layers[loop] = RedisLoopLayer(self)
+ 
+-        return aioredis.Redis(connection_pool=self.pools[index])
++        return layer.get_connection(index)
+diff --git a/channels_redis/pubsub.py b/channels_redis/pubsub.py
+index 3c10378..550325b 100644
+--- a/channels_redis/pubsub.py
++++ b/channels_redis/pubsub.py
+@@ -1,32 +1,16 @@
+ import asyncio
+ import functools
+ import logging
+-import types
+ import uuid
+ 
+ import msgpack
+ from redis import asyncio as aioredis
+ 
+-from .utils import _consistent_hash
++from .utils import _consistent_hash, _wrap_close
+ 
+ logger = logging.getLogger(__name__)
+ 
+ 
+-def _wrap_close(proxy, loop):
+-    original_impl = loop.close
+-
+-    def _wrapper(self, *args, **kwargs):
+-        if loop in proxy._layers:
+-            layer = proxy._layers[loop]
+-            del proxy._layers[loop]
+-            loop.run_until_complete(layer.flush())
+-
+-        self.close = original_impl
+-        return self.close(*args, **kwargs)
+-
+-    loop.close = types.MethodType(_wrapper, loop)
+-
+-
+ async def _async_proxy(obj, name, *args, **kwargs):
+     # Must be defined as a function and not a method due to
+     # https://bugs.python.org/issue38364
+diff --git a/channels_redis/utils.py b/channels_redis/utils.py
+index 7b30fdc..d2405bb 100644
+--- a/channels_redis/utils.py
++++ b/channels_redis/utils.py
+@@ -1,4 +1,5 @@
+ import binascii
++import types
+ 
+ 
+ def _consistent_hash(value, ring_size):
+@@ -15,3 +16,18 @@ def _consistent_hash(value, ring_size):
+     bigval = binascii.crc32(value) & 0xFFF
+     ring_divisor = 4096 / float(ring_size)
+     return int(bigval / ring_divisor)
++
++
++def _wrap_close(proxy, loop):
++    original_impl = loop.close
++
++    def _wrapper(self, *args, **kwargs):
++        if loop in proxy._layers:
++            layer = proxy._layers[loop]
++            del proxy._layers[loop]
++            loop.run_until_complete(layer.flush())
++
++        self.close = original_impl
++        return self.close(*args, **kwargs)
++
++    loop.close = types.MethodType(_wrapper, loop)
diff -Nru python-channels-redis-4.0.0/debian/patches/series python-channels-redis-4.0.0/debian/patches/series
--- python-channels-redis-4.0.0/debian/patches/series	1970-01-01 01:00:00.000000000 +0100
+++ python-channels-redis-4.0.0/debian/patches/series	2024-02-19 12:04:34.000000000 +0000
@@ -0,0 +1 @@
+loop-close-pools-in-core.patch

--- End Message ---
--- Begin Message ---
Version: 12.6

The upload requested in this bug has been released as part of 12.6.

--- End Message ---

Reply to: