Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Threaded MutationsBatcher #722

Merged
merged 24 commits into from
May 10, 2023
Merged
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Make _MutationsBatchQueue private
  • Loading branch information
Mariatta committed Apr 5, 2023
commit dbf80c37975580320b17b097602014c29b83e314
35 changes: 20 additions & 15 deletions google/cloud/bigtable/batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,19 @@ class MaxMutationsError(ValueError):
"""The number of mutations for bulk request is too big."""


class MutationsBatchQueue(queue.Queue):
"""Threadsafe Queue to hold rows for batching"""
class _MutationsBatchQueue(object):
"""Private Threadsafe Queue to hold rows for batching."""

def __init__(self, max_row_bytes=MAX_ROW_BYTES, flush_count=FLUSH_COUNT):
mutianf marked this conversation as resolved.
Show resolved Hide resolved
mutianf marked this conversation as resolved.
Show resolved Hide resolved
"""Specify the queue constraints"""
self._queue = queue.Queue(maxsize=flush_count)
self.total_mutation_count = 0
self.total_size = 0
self.max_row_bytes = max_row_bytes
super().__init__(maxsize=flush_count)

def get(self, block=True, timeout=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when would block = False?

"""Retrieve an item from the queue. Recalculate queue size."""
row = super().get(block=block, timeout=timeout)
row = self._queue.get(block=block, timeout=timeout)
mutation_size = row.get_mutations_size()
self.total_mutation_count -= len(row._get_mutations())
self.total_size -= mutation_size
Expand All @@ -49,14 +49,17 @@ def put(self, item, block=True, timeout=None):
"""Insert an item to the queue. Recalculate queue size."""
self.total_size += item.get_mutations_size()
self.total_mutation_count += len(item._get_mutations())
super().put(item, block=block, timeout=timeout)
self._queue.put(item, block=block, timeout=timeout)

def full(self):
"""Check if the queue is full."""
if self.total_size >= self.max_row_bytes or super().full():
if self.total_size >= self.max_row_bytes or self._queue.full():
return True
return False

def empty(self):
return self._queue.empty()


class MutationsBatcher(object):
"""A MutationsBatcher is used in batch cases where the number of mutations
Expand Down Expand Up @@ -91,7 +94,7 @@ class MutationsBatcher(object):
"""

def __init__(self, table, flush_count=FLUSH_COUNT, max_row_bytes=MAX_ROW_BYTES):
self.rows = MutationsBatchQueue(
self.rows = _MutationsBatchQueue(
max_row_bytes=max_row_bytes, flush_count=flush_count
)
self.table = table
Expand Down Expand Up @@ -126,7 +129,7 @@ def mutate(self, row):
mutations count.
"""
if self.rows.full():
self.flush(in_executor=True)
self.flush_async()
self.rows.put(row)

def mutate_rows(self, rows):
Expand Down Expand Up @@ -154,8 +157,8 @@ def mutate_rows(self, rows):
for row in rows:
self.mutate(row)

def flush(self, in_executor=False):
"""Sends the rows in the batch to Cloud Bigtable.
def flush(self):
"""Sends the current. batch to Cloud Bigtable.
For example:

.. literalinclude:: snippets.py
Expand All @@ -167,11 +170,13 @@ def flush(self, in_executor=False):
rows_to_flush = []
while not self.rows.empty():
rows_to_flush.append(self.rows.get())
if in_executor:
self.executor.submit(self.flush_rows, rows_to_flush)
else:
# running it directly not in thread
self.flush_rows(rows_to_flush)
self.flush_rows(rows_to_flush)

def flush_async(self):
rows_to_flush = []
while not self.rows.empty():
rows_to_flush.append(self.rows.get())
self.executor.submit(self.flush_rows, rows_to_flush)

def flush_rows(self, rows_to_flush=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having both flush() and flush_rows() seems a little confusing. Especially since this doesn't seem to be "flushing" from the cache in any way. I could see people calling this with no arguments thinking it is the main flush function

Maybe this should be called mutate_rows? Or just made into an internal helper function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree this is confusing. There is already a public mutate_rows from before, and it has different behavior than this one. I'm changing it to private since user isn't expected to call it manually.

"""Mutate the specified rows."""
Expand Down