Browse Source

Extend cassandra to cover AstraDB as well (#7356)

* Cassandra backend: bumped driver to v3.24 to support Astra DB,
adapted the backend code to that effect, introduced new setting
`cassandra_secure_bundle_path` and updated the documentation
to reflect this.

* edits to docs - configuration for cassandra

* Update requirements/extras/cassandra.txt

Co-authored-by: Asif Saif Uddin <auvipy@gmail.com>

* Update README.rst

Co-authored-by: Omer Katz <omer.katz@omerkatz.com>

* Cassandra backend for Astra: more test coverage, more docs, driver version bumped

Co-authored-by: Stefano Lottini <stefano.lottini@datastax.com>
Co-authored-by: Asif Saif Uddin <auvipy@gmail.com>
Co-authored-by: Omer Katz <omer.katz@omerkatz.com>
pull/7337/merge
Stefano Lottini 4 months ago committed by GitHub
parent
commit
8d35c655d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      CONTRIBUTING.rst
  2. 2
      README.rst
  3. 1
      celery/app/defaults.py
  4. 42
      celery/backends/cassandra.py
  5. 2
      docs/includes/installation.txt
  6. 111
      docs/userguide/configuration.rst
  7. 2
      requirements/extras/cassandra.txt
  8. 39
      t/unit/backends/test_cassandra.py

2
CONTRIBUTING.rst

@ -1170,7 +1170,7 @@ that require third-party libraries must be added.
.. code-block:: console
$ pip install -U requirements/pkgutils.txt
$ pip install -U -r requirements/pkgutils.txt
$ make readme

2
README.rst

@ -307,7 +307,7 @@ Transports and Backends
for using Memcached as a result backend (pure-Python implementation).
:``celery[cassandra]``:
for using Apache Cassandra as a result backend with DataStax driver.
for using Apache Cassandra/Astra DB as a result backend with the DataStax driver.
:``celery[azureblockblob]``:
for using Azure Storage as a result backend (using ``azure-storage``)

1
celery/app/defaults.py

@ -114,6 +114,7 @@ NAMESPACES = Namespace(
port=Option(type='string'),
read_consistency=Option(type='string'),
servers=Option(type='list'),
bundle_path=Option(type='string'),
table=Option(type='string'),
write_consistency=Option(type='string'),
auth_provider=Option(type='string'),

42
celery/backends/cassandra.py

@ -30,6 +30,10 @@ CASSANDRA_AUTH_PROVIDER you provided is not a valid auth_provider class.
See https://datastax.github.io/python-driver/api/cassandra/auth.html.
"""
E_CASSANDRA_MISCONFIGURED = 'Cassandra backend improperly configured.'
E_CASSANDRA_NOT_CONFIGURED = 'Cassandra backend not configured.'
Q_INSERT_RESULT = """
INSERT INTO {table} (
task_id, status, result, date_done, traceback, children) VALUES (
@ -65,21 +69,24 @@ def buf_t(x):
class CassandraBackend(BaseBackend):
"""Cassandra backend utilizing DataStax driver.
"""Cassandra/AstraDB backend utilizing DataStax driver.
Raises:
celery.exceptions.ImproperlyConfigured:
if module :pypi:`cassandra-driver` is not available,
or if the :setting:`cassandra_servers` setting is not set.
or not-exactly-one of the :setting:`cassandra_servers` and
the :setting:`cassandra_secure_bundle_path` settings is set.
"""
#: List of Cassandra servers with format: ``hostname``.
servers = None
#: Location of the secure connect bundle zipfile (absolute path).
bundle_path = None
supports_autoexpire = True # autoexpire supported via entry_ttl
def __init__(self, servers=None, keyspace=None, table=None, entry_ttl=None,
port=9042, **kwargs):
port=9042, bundle_path=None, **kwargs):
super().__init__(**kwargs)
if not cassandra:
@ -87,13 +94,20 @@ class CassandraBackend(BaseBackend):
conf = self.app.conf
self.servers = servers or conf.get('cassandra_servers', None)
self.bundle_path = bundle_path or conf.get(
'cassandra_secure_bundle_path', None)
self.port = port or conf.get('cassandra_port', None)
self.keyspace = keyspace or conf.get('cassandra_keyspace', None)
self.table = table or conf.get('cassandra_table', None)
self.cassandra_options = conf.get('cassandra_options', {})
if not self.servers or not self.keyspace or not self.table:
raise ImproperlyConfigured('Cassandra backend not configured.')
# either servers or bundle path must be provided...
db_directions = self.servers or self.bundle_path
if not db_directions or not self.keyspace or not self.table:
raise ImproperlyConfigured(E_CASSANDRA_NOT_CONFIGURED)
# ...but not both:
if self.servers and self.bundle_path:
raise ImproperlyConfigured(E_CASSANDRA_MISCONFIGURED)
expires = entry_ttl or conf.get('cassandra_entry_ttl', None)
@ -137,10 +151,20 @@ class CassandraBackend(BaseBackend):
try:
if self._session is not None:
return
self._cluster = cassandra.cluster.Cluster(
self.servers, port=self.port,
auth_provider=self.auth_provider,
**self.cassandra_options)
# using either 'servers' or 'bundle_path' here:
if self.servers:
self._cluster = cassandra.cluster.Cluster(
self.servers, port=self.port,
auth_provider=self.auth_provider,
**self.cassandra_options)
else:
# 'bundle_path' is guaranteed to be set
self._cluster = cassandra.cluster.Cluster(
cloud={
'secure_connect_bundle': self.bundle_path,
},
auth_provider=self.auth_provider,
**self.cassandra_options)
self._session = self._cluster.connect(self.keyspace)
# We're forced to do concatenation below, as formatting would

2
docs/includes/installation.txt

@ -77,7 +77,7 @@ Transports and Backends
for using Memcached as a result backend (pure-Python implementation).
:``celery[cassandra]``:
for using Apache Cassandra as a result backend with DataStax driver.
for using Apache Cassandra/Astra DB as a result backend with DataStax driver.
:``celery[couchbase]``:
for using Couchbase as a result backend.

111
docs/userguide/configuration.rst

@ -1314,13 +1314,19 @@ used by the redis result backend.
.. _conf-cassandra-result-backend:
Cassandra backend settings
--------------------------
Cassandra/AstraDB backend settings
----------------------------------
.. note::
This Cassandra backend driver requires :pypi:`cassandra-driver`.
This backend can refer to either a regular Cassandra installation
or a managed Astra DB instance. Depending on which one, exactly one
between the :setting:`cassandra_servers` and
:setting:`cassandra_secure_bundle_path` settings must be provided
(but not both).
To install, use :command:`pip`:
.. code-block:: console
@ -1339,10 +1345,32 @@ This backend requires the following configuration directives to be set.
Default: ``[]`` (empty list).
List of ``host`` Cassandra servers. For example::
List of ``host`` Cassandra servers. This must be provided when connecting to
a Cassandra cluster. Passing this setting is strictly exclusive
to :setting:`cassandra_secure_bundle_path`. Example::
cassandra_servers = ['localhost']
.. setting:: cassandra_secure_bundle_path
``cassandra_secure_bundle_path``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Default: None.
Absolute path to the secure-connect-bundle zip file to connect
to an Astra DB instance. Passing this setting is strictly exclusive
to :setting:`cassandra_servers`.
Example::
cassandra_secure_bundle_path = '/home/user/bundles/secure-connect.zip'
When connecting to Astra DB, it is necessary to specify
the plain-text auth provider and the associated username and password,
which take the value of the Client ID and the Client Secret, respectively,
of a valid token generated for the Astra DB instance.
See below for an Astra DB configuration example.
.. setting:: cassandra_port
``cassandra_port``
@ -1359,7 +1387,7 @@ Port to contact the Cassandra servers on.
Default: None.
The key-space in which to store the results. For example::
The keyspace in which to store the results. For example::
cassandra_keyspace = 'tasks_keyspace'
@ -1446,18 +1474,85 @@ Named arguments to pass into the ``cassandra.cluster`` class.
'protocol_version': 3
}
Example configuration
~~~~~~~~~~~~~~~~~~~~~
Example configuration (Cassandra)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code-block:: python
result_backend = 'cassandra://'
cassandra_servers = ['localhost']
cassandra_keyspace = 'celery'
cassandra_table = 'tasks'
cassandra_read_consistency = 'ONE'
cassandra_write_consistency = 'ONE'
cassandra_read_consistency = 'QUORUM'
cassandra_write_consistency = 'QUORUM'
cassandra_entry_ttl = 86400
Example configuration (Astra DB)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. code-block:: python
result_backend = 'cassandra://'
cassandra_keyspace = 'celery'
cassandra_table = 'tasks'
cassandra_read_consistency = 'QUORUM'
cassandra_write_consistency = 'QUORUM'
cassandra_auth_provider = 'PlainTextAuthProvider'
cassandra_auth_kwargs = {
'username': '<<CLIENT_ID_FROM_ASTRA_DB_TOKEN>>',
'password': '<<CLIENT_SECRET_FROM_ASTRA_DB_TOKEN>>'
}
cassandra_secure_bundle_path = '/path/to/secure-connect-bundle.zip'
cassandra_entry_ttl = 86400
Additional configuration
~~~~~~~~~~~~~~~~~~~~~~~~
The Cassandra driver, when estabilishing the connection, undergoes a stage
of negotiating the protocol version with the server(s). Similarly,
a load-balancing policy is automatically supplied (by default
``DCAwareRoundRobinPolicy``, which in turn has a ``local_dc`` setting, also
determined by the driver upon connection).
When possible, one should explicitly provide these in the configuration:
moreover, future versions of the Cassandra driver will require at least the
load-balancing policy to be specified (using `execution profiles <https://docs.datastax.com/en/developer/python-driver/3.25/execution_profiles/>`_,
as shown below).
A full configuration for the Cassandra backend would thus have the
following additional lines:
.. code-block:: python
from cassandra.policies import DCAwareRoundRobinPolicy
from cassandra.cluster import ExecutionProfile
from cassandra.cluster import EXEC_PROFILE_DEFAULT
myEProfile = ExecutionProfile(
load_balancing_policy=DCAwareRoundRobinPolicy(
local_dc='datacenter1', # replace with your DC name
)
)
cassandra_options = {
'protocol_version': 5, # for Cassandra 4, change if needed
'execution_profiles': {EXEC_PROFILE_DEFAULT: myEProfile},
}
And similarly for Astra DB:
.. code-block:: python
from cassandra.policies import DCAwareRoundRobinPolicy
from cassandra.cluster import ExecutionProfile
from cassandra.cluster import EXEC_PROFILE_DEFAULT
myEProfile = ExecutionProfile(
load_balancing_policy=DCAwareRoundRobinPolicy(
local_dc='europe-west1', # for Astra DB, region name = dc name
)
)
cassandra_options = {
'protocol_version': 4, # for Astra DB
'execution_profiles': {EXEC_PROFILE_DEFAULT: myEProfile},
}
.. _conf-s3-result-backend:
S3 backend settings

2
requirements/extras/cassandra.txt

@ -1 +1 @@
cassandra-driver<3.21.0
cassandra-driver>=3.24.0,<4

39
t/unit/backends/test_cassandra.py

@ -53,13 +53,50 @@ class test_CassandraBackend:
cons.LOCAL_FOO = 'bar'
mod.CassandraBackend(app=self.app)
# no servers raises ImproperlyConfigured
# no servers and no bundle_path raises ImproperlyConfigured
with pytest.raises(ImproperlyConfigured):
self.app.conf.cassandra_servers = None
self.app.conf.cassandra_secure_bundle_path = None
mod.CassandraBackend(
app=self.app, keyspace='b', column_family='c',
)
# both servers no bundle_path raises ImproperlyConfigured
with pytest.raises(ImproperlyConfigured):
self.app.conf.cassandra_servers = ['localhost']
self.app.conf.cassandra_secure_bundle_path = (
'/home/user/secure-connect-bundle.zip')
mod.CassandraBackend(
app=self.app, keyspace='b', column_family='c',
)
def test_init_with_cloud(self):
# Tests behavior when Cluster.connect works properly
# and cluster is created with 'cloud' param instead of 'contact_points'
from celery.backends import cassandra as mod
class DummyClusterWithBundle:
def __init__(self, *args, **kwargs):
if args != ():
# this cluster is supposed to be created with 'cloud=...'
raise ValueError('I should be created with kwargs only')
pass
def connect(self, *args, **kwargs):
return Mock()
mod.cassandra = Mock()
mod.cassandra.cluster = Mock()
mod.cassandra.cluster.Cluster = DummyClusterWithBundle
self.app.conf.cassandra_secure_bundle_path = '/path/to/bundle.zip'
self.app.conf.cassandra_servers = None
x = mod.CassandraBackend(app=self.app)
x._get_connection()
assert isinstance(x._cluster, DummyClusterWithBundle)
@pytest.mark.patched_module(*CASSANDRA_MODULES)
@pytest.mark.usefixtures('depends_on_current_app')
def test_reduce(self, module):

Loading…
Cancel
Save