Browse Source

Worker should exit with ctx.exit to get the right exitcode for non-zero cases (#7544)

* Worker should exit with ctx.exit to get the right exitcode for non-zero cases

* Add fast-fail coverage to worker

* Add unit test for celery worker exit

* Fix non-encapsulated test app

* Use test celery project

* Use solo pool to try and fix windows thread issues

* Disable capture to aid test debug
pull/7553/head
Tom Parker-Shemilt 2 months ago committed by GitHub
parent
commit
617a757c7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      celery/bin/worker.py
  2. 1
      requirements/test.txt
  3. 18
      t/integration/test_worker.py
  4. 12
      t/integration/test_worker_config.py
  5. 4
      t/unit/app/test_app.py
  6. 1
      t/unit/bin/proj/app.py
  7. 20
      t/unit/bin/test_worker.py
  8. 93
      t/unit/contrib/test_worker.py
  9. 2
      tox.ini

2
celery/bin/worker.py

@ -351,7 +351,7 @@ def worker(ctx, hostname=None, pool_cls=None, app=None, uid=None, gid=None,
quiet=ctx.obj.quiet,
**kwargs)
worker.start()
return worker.exitcode
ctx.exit(worker.exitcode)
except SecurityError as e:
ctx.obj.error(e.args[0])
ctx.exit(1)

1
requirements/test.txt

@ -2,6 +2,7 @@ pytest~=7.1.1
pytest-celery
pytest-subtests==0.8.0
pytest-timeout~=2.1.0
pytest-click
boto3>=1.9.178
moto>=2.2.6
# typing extensions

18
t/integration/test_worker.py

@ -0,0 +1,18 @@
import subprocess
import pytest
def test_run_worker():
with pytest.raises(subprocess.CalledProcessError) as exc_info:
subprocess.check_output(
["celery", "--config", "t.integration.test_worker_config", "worker"],
stderr=subprocess.STDOUT)
called_process_error = exc_info.value
assert called_process_error.returncode == 1, called_process_error
output = called_process_error.output.decode('utf-8')
assert output.find(
"Retrying to establish a connection to the message broker after a connection "
"loss has been disabled (app.conf.broker_connection_retry_on_startup=False). "
"Shutting down...") != -1, output

12
t/integration/test_worker_config.py

@ -0,0 +1,12 @@
# Test config for t/integration/test_worker.py
broker_url = 'amqp://guest:guest@foobar:1234//'
# Fail fast for test_run_worker
broker_connection_retry_on_startup = False
broker_connection_retry = False
broker_connection_timeout = 0
worker_log_color = False
worker_redirect_stdouts = False

4
t/unit/app/test_app.py

@ -591,8 +591,8 @@ class test_App:
mocked_celery.main.assert_called_with(
args=['worker', '--help'], standalone_mode=False)
def test_config_from_envvar(self):
os.environ['CELERYTEST_CONFIG_OBJECT'] = 't.unit.app.test_app'
def test_config_from_envvar(self, monkeypatch):
monkeypatch.setenv("CELERYTEST_CONFIG_OBJECT", 't.unit.app.test_app')
self.app.config_from_envvar('CELERYTEST_CONFIG_OBJECT')
assert self.app.conf.THIS_IS_A_KEY == 'this is a value'

1
t/unit/bin/proj/app.py

@ -1,3 +1,4 @@
from celery import Celery
app = Celery(set_as_current=False)
app.config_from_object("t.integration.test_worker_config")

20
t/unit/bin/test_worker.py

@ -0,0 +1,20 @@
import pytest
from click.testing import CliRunner
from celery.app.log import Logging
from celery.bin.celery import celery
@pytest.fixture(scope='session')
def use_celery_app_trap():
return False
def test_cli(isolated_cli_runner: CliRunner):
Logging._setup = True # To avoid hitting the logging sanity checks
res = isolated_cli_runner.invoke(
celery,
["-A", "t.unit.bin.proj.app", "worker", "--pool", "solo"],
catch_exceptions=False
)
assert res.exit_code == 1, (res, res.stdout)

93
t/unit/contrib/test_worker.py

@ -1,56 +1,47 @@
import pytest
# this import adds a @shared_task, which uses connect_on_app_finalize
# to install the celery.ping task that the test lib uses
import celery.contrib.testing.tasks # noqa: F401
from celery import Celery
from celery.contrib.testing.worker import start_worker
app = Celery('celerytest',
backend='cache+memory://',
broker='memory://',
)
@app.task
def add(x, y):
return x + y
def test_start_worker():
app.config_from_object({
'worker_hijack_root_logger': False,
})
# this import adds a @shared_task, which uses connect_on_app_finalize
# to install the celery.ping task that the test lib uses
import celery.contrib.testing.tasks # noqa: F401
# to avoid changing the root logger level to ERROR,
# we have we have to set both app.log.loglevel start_worker arg to 0
# (see celery.app.log.setup_logging_subsystem)
app.log.loglevel = 0
with start_worker(app=app, loglevel=0):
result = add.s(1, 2).apply_async()
val = result.get(timeout=5)
assert val == 3
@app.task
def error_task():
raise NotImplementedError()
def test_start_worker_with_exception():
"""Make sure that start_worker does not hang on exception"""
app.config_from_object({
'worker_hijack_root_logger': False,
})
# this import adds a @shared_task, which uses connect_on_app_finalize
# to install the celery.ping task that the test lib uses
import celery.contrib.testing.tasks # noqa: F401
# to avoid changing the root logger level to ERROR,
# we have we have to set both app.log.loglevel start_worker arg to 0
# (see celery.app.log.setup_logging_subsystem)
app.log.loglevel = 0
with pytest.raises(NotImplementedError):
with start_worker(app=app, loglevel=0):
result = error_task.apply_async()
result.get(timeout=5)
class test_worker:
def setup(self):
self.app = Celery('celerytest', backend='cache+memory://', broker='memory://',)
@self.app.task
def add(x, y):
return x + y
self.add = add
@self.app.task
def error_task():
raise NotImplementedError()
self.error_task = error_task
self.app.config_from_object({
'worker_hijack_root_logger': False,
})
# to avoid changing the root logger level to ERROR,
# we have we have to set both app.log.loglevel start_worker arg to 0
# (see celery.app.log.setup_logging_subsystem)
self.app.log.loglevel = 0
def test_start_worker(self):
with start_worker(app=self.app, loglevel=0):
result = self.add.s(1, 2).apply_async()
val = result.get(timeout=5)
assert val == 3
def test_start_worker_with_exception(self):
"""Make sure that start_worker does not hang on exception"""
with pytest.raises(NotImplementedError):
with start_worker(app=self.app, loglevel=0):
result = self.error_task.apply_async()
result.get(timeout=5)

2
tox.ini

@ -41,7 +41,7 @@ deps=
bandit: bandit
commands =
unit: pytest --maxfail=10 -v --cov=celery --cov-report=xml --cov-report term {posargs}
unit: pytest --maxfail=10 --capture=no -v --cov=celery --cov-report=xml --cov-report term {posargs}
integration: pytest -xsv t/integration {posargs}
setenv =
PIP_EXTRA_INDEX_URL=https://celery.github.io/celery-wheelhouse/repo/simple/

Loading…
Cancel
Save