Browse Source

async chords should pass it's kwargs to the group/body

pull/7602/head
Eric Yen 2 months ago committed by Asif Saif Uddin
parent
commit
ec3714edf3
  1. 6
      celery/canvas.py
  2. 30
      t/integration/test_canvas.py
  3. 2
      t/unit/tasks/test_chord.py

6
celery/canvas.py

@ -1705,7 +1705,7 @@ class _chord(Signature):
task_id = option_task_id
# chord([A, B, ...], C)
return self.run(tasks, body, args, task_id=task_id, **merged_options)
return self.run(tasks, body, args, task_id=task_id, kwargs=kwargs, **merged_options)
def apply(self, args=None, kwargs=None,
propagate=True, body=None, **options):
@ -1755,7 +1755,7 @@ class _chord(Signature):
def run(self, header, body, partial_args, app=None, interval=None,
countdown=1, max_retries=None, eager=False,
task_id=None, **options):
task_id=None, kwargs=None, **options):
app = app or self._get_app(body)
group_id = header.options.get('task_id') or uuid()
root_id = body.options.get('root_id')
@ -1782,7 +1782,7 @@ class _chord(Signature):
countdown=countdown,
max_retries=max_retries,
)
header_result = header(*partial_args, task_id=group_id, **options)
header_result = header.apply_async(partial_args, kwargs, task_id=group_id, **options)
# The execution of a chord body is normally triggered by its header's
# tasks completing. If the header is empty this will never happen, so
# we execute the body manually here.

30
t/integration/test_canvas.py

@ -1424,6 +1424,36 @@ class test_chord:
res = c()
assert res.get(timeout=TIMEOUT) == [12, 13, 14, 15]
def test_group_kwargs(self, manager):
try:
manager.app.backend.ensure_chords_allowed()
except NotImplementedError as e:
raise pytest.skip(e.args[0])
c = (
add.s(2, 2) |
group(add.s(i) for i in range(4)) |
add_to_all.s(8)
)
res = c.apply_async(kwargs={"z": 1})
assert res.get(timeout=TIMEOUT) == [13, 14, 15, 16]
def test_group_args_and_kwargs(self, manager):
try:
manager.app.backend.ensure_chords_allowed()
except NotImplementedError as e:
raise pytest.skip(e.args[0])
c = (
group(add.s(i) for i in range(4)) |
add_to_all.s(8)
)
res = c.apply_async(args=(4,), kwargs={"z": 1})
if manager.app.conf.result_backend.startswith('redis'):
# for a simple chord like the one above, redis does not guarantee
# the ordering of the results as a performance trade off.
assert set(res.get(timeout=TIMEOUT)) == {13, 14, 15, 16}
else:
assert res.get(timeout=TIMEOUT) == [13, 14, 15, 16]
def test_nested_group_chain(self, manager):
try:
manager.app.backend.ensure_chords_allowed()

2
t/unit/tasks/test_chord.py

@ -232,6 +232,7 @@ class test_unlock_chord_task(ChordCase):
mul.s(),
(),
task_id=None,
kwargs={},
interval=10,
groups=[ch.tasks.id],
stamped_headers=['groups']
@ -255,6 +256,7 @@ class test_unlock_chord_task(ChordCase):
mul.s(),
(),
task_id=sentinel.task_id,
kwargs={},
interval=10,
groups=[ch.tasks.id],
stamped_headers=['groups']

Loading…
Cancel
Save