Use inspect.getgeneratorstate in asynpool.gen_not_started (#7476)

* Use inspect.getgeneratorstate in asynpool.gen_not_started

This improves compatibility with the nogil Python fork, which does not
have the gi_frame attribute on generators.

* Add additional tests for gen_not_started

Checks that gen_not_started is not true while the generator is running
and after the generator has exited due to an exception.
pull/7337/merge
Sam Gross 5 months ago committed by GitHub
parent 0a3487b882
commit 6484ea0a4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      celery/concurrency/asynpool.py
  2. 12
      t/unit/concurrency/test_prefork.py

@ -14,6 +14,7 @@ This code deals with three major challenges:
"""
import errno
import gc
import inspect
import os
import select
import time
@ -89,8 +90,7 @@ Ack = namedtuple('Ack', ('id', 'fd', 'payload'))
def gen_not_started(gen):
"""Return true if generator is not started."""
# gi_frame is None when generator stopped.
return gen.gi_frame and gen.gi_frame.f_lasti == -1
return inspect.getgeneratorstate(gen) == "GEN_CREATED"
def _get_job_writer(job):

@ -201,6 +201,7 @@ class test_AsynPool:
def gen():
yield 1
assert not asynpool.gen_not_started(g)
yield 2
g = gen()
assert asynpool.gen_not_started(g)
@ -209,6 +210,17 @@ class test_AsynPool:
list(g)
assert not asynpool.gen_not_started(g)
def gen2():
yield 1
raise RuntimeError('generator error')
g = gen2()
assert asynpool.gen_not_started(g)
next(g)
assert not asynpool.gen_not_started(g)
with pytest.raises(RuntimeError):
next(g)
assert not asynpool.gen_not_started(g)
@patch('select.select', create=True)
def test_select(self, __select):
ebadf = socket.error()

Loading…
Cancel
Save