Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Threaded MutationsBatcher #722

Merged
merged 24 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add flush_interval.
Raise error when mutating when batcher is already closed.
  • Loading branch information
Mariatta committed Apr 5, 2023
commit f31265901cd06c575e2dbfec8380bcc52fae9541
45 changes: 42 additions & 3 deletions google/cloud/bigtable/batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import threading
import queue
import concurrent.futures

import atexit

FLUSH_COUNT = 1000
MAX_MUTATIONS = 100000
Expand All @@ -27,6 +27,10 @@ class MaxMutationsError(ValueError):
"""The number of mutations for bulk request is too big."""


class BatcherIsClosedError(ValueError):
"""Batcher is already closed and not accepting any mutation."""


class _MutationsBatchQueue(object):
"""Private Threadsafe Queue to hold rows for batching."""

Expand Down Expand Up @@ -75,6 +79,22 @@ def empty(self):
return self._queue.empty()


def _batcher_is_open(func):
"""Decorator to check if the batcher is open or closed.
:raises:
* :exc:`.batcher.BatcherIsClosedError` if batcher is already
closed

"""

def wrapper(self, *args, **kwargs):
if not self._is_open:
raise BatcherIsClosedError()
func(self, *args, **kwargs)

return wrapper


class MutationsBatcher(object):
"""A MutationsBatcher is used in batch cases where the number of mutations
is large or unknown. It will store DirectRows in memory until one of the
Expand Down Expand Up @@ -105,15 +125,27 @@ class MutationsBatcher(object):
flush. If it reaches the max number of row mutations size it calls
finish_batch() to mutate the current row batch. Default is MAX_ROW_BYTES
(5 MB).

Mariatta marked this conversation as resolved.
Show resolved Hide resolved
:type flush_interval: float
:param flush_interval: (Optional) The interval (in seconds) between asynchronous flush.
Default is 1 second.
"""

def __init__(self, table, flush_count=FLUSH_COUNT, max_row_bytes=MAX_ROW_BYTES):
def __init__(
self,
table,
flush_count=FLUSH_COUNT,
max_row_bytes=MAX_ROW_BYTES,
flush_interval=1,
):
self._rows = _MutationsBatchQueue(
max_row_bytes=max_row_bytes, flush_count=flush_count
)
self.table = table
self._executor = concurrent.futures.ThreadPoolExecutor()
threading.Timer(1, self.flush).start()
self._is_open = True
atexit.register(self.close)
threading.Timer(flush_interval, self.flush).start()

@property
def flush_count(self):
Expand All @@ -127,10 +159,15 @@ def __enter__(self):
"""Starting the MutationsBatcher as a context manager"""
return self

@_batcher_is_open
def mutate(self, row):
"""Add a row to the batch. If the current batch meets one of the size
limits, the batch is sent asynchronously.

:raises:
* :exc:`.batcher.BatcherIsClosedError` if batcher is already
closed

For example:

.. literalinclude:: snippets.py
Expand All @@ -154,6 +191,7 @@ def mutate(self, row):
if self._rows.full():
self.flush_async()

@_batcher_is_open
def mutate_rows(self, rows):
"""Add multiple rows to the batch. If the current batch meets one of the size
limits, the batch is sent asynchronously.
Expand Down Expand Up @@ -213,5 +251,6 @@ def __exit__(self, exc_type, exc_value, exc_traceback):

def close(self):
"""Clean up resources. Flush and shutdown the ThreadPoolExecutor."""
self._is_open = False
self.flush()
mutianf marked this conversation as resolved.
Show resolved Hide resolved
self._executor.shutdown(wait=True)
15 changes: 15 additions & 0 deletions tests/unit/test_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,21 @@ def test_mutations_batcher_context_manager_flushed_when_closed():
assert table.mutation_calls == 1


def test_mutations_batcher_mutate_after_batcher_closed_raise_error():
from google.cloud.bigtable.batcher import BatcherIsClosedError

table = _Table(TABLE_NAME)
mutation_batcher = MutationsBatcher(table=table)
mutation_batcher.close()

assert table.mutation_calls == 0
with pytest.raises(BatcherIsClosedError):
mutation_batcher.close()
row = DirectRow(row_key=b"row_key")
row.set_cell("cf1", b"c1", 1)
mutation_batcher.mutate(row)


class _Instance(object):
def __init__(self, client=None):
self._client = client
Expand Down