Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Threaded MutationsBatcher #722

Merged
merged 24 commits into from
May 10, 2023
Merged
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Feat: Threaded MutationsBatcher
- Batch mutations in a thread to allow concurrent batching
- Flush the batch every second
  • Loading branch information
Mariatta committed Apr 5, 2023
commit 3010cd5d2b14966179b6b8a114577ef391b149fe
100 changes: 71 additions & 29 deletions google/cloud/bigtable/batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# limitations under the License.

"""User friendly container for Google Cloud Bigtable MutationBatcher."""
import threading
import queue
import concurrent.futures


FLUSH_COUNT = 1000
Expand All @@ -24,6 +27,37 @@ class MaxMutationsError(ValueError):
"""The number of mutations for bulk request is too big."""


class MutationsBatchQueue(queue.Queue):
"""Threadsafe Queue to hold rows for batching"""

def __init__(self, max_row_bytes=MAX_ROW_BYTES, flush_count=FLUSH_COUNT):
mutianf marked this conversation as resolved.
Show resolved Hide resolved
mutianf marked this conversation as resolved.
Show resolved Hide resolved
"""Specify the queue constraints"""
self.total_mutation_count = 0
self.total_size = 0
self.max_row_bytes = max_row_bytes
super().__init__(maxsize=flush_count)

def get(self, block=True, timeout=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when would block = False?

"""Retrieve an item from the queue. Recalculate queue size."""
row = super().get(block=block, timeout=timeout)
mutation_size = row.get_mutations_size()
self.total_mutation_count -= len(row._get_mutations())
self.total_size -= mutation_size
return row

def put(self, item, block=True, timeout=None):
"""Insert an item to the queue. Recalculate queue size."""
self.total_size += item.get_mutations_size()
self.total_mutation_count += len(item._get_mutations())
super().put(item, block=block, timeout=timeout)

def full(self):
"""Check if the queue is full."""
if self.total_size >= self.max_row_bytes or super().full():
return True
return False


class MutationsBatcher(object):
"""A MutationsBatcher is used in batch cases where the number of mutations
is large or unknown. It will store DirectRows in memory until one of the
Expand Down Expand Up @@ -57,12 +91,17 @@ class MutationsBatcher(object):
"""

def __init__(self, table, flush_count=FLUSH_COUNT, max_row_bytes=MAX_ROW_BYTES):
self.rows = []
self.total_mutation_count = 0
self.total_size = 0
self.rows = MutationsBatchQueue(
max_row_bytes=max_row_bytes, flush_count=flush_count
)
self.table = table
self.flush_count = flush_count
self.max_row_bytes = max_row_bytes
self.flushed_counter = 0
self.executor = concurrent.futures.ThreadPoolExecutor()
threading.Timer(1, self.flush).start()

def __enter__(self):
"""Starting the MutationsBatcher as a context manager"""
return self

def mutate(self, row):
"""Add a row to the batch. If the current batch meets one of the size
Expand All @@ -86,23 +125,9 @@ def mutate(self, row):
* :exc:`.batcher.MaxMutationsError` if any row exceeds max
mutations count.
"""
mutation_count = len(row._get_mutations())
if mutation_count > MAX_MUTATIONS:
raise MaxMutationsError(
"The row key {} exceeds the number of mutations {}.".format(
row.row_key, mutation_count
)
)

if (self.total_mutation_count + mutation_count) >= MAX_MUTATIONS:
self.flush()

self.rows.append(row)
self.total_mutation_count += mutation_count
self.total_size += row.get_mutations_size()

if self.total_size >= self.max_row_bytes or len(self.rows) >= self.flush_count:
self.flush()
if self.rows.full():
self.flush(in_executor=True)
self.rows.put(row)

def mutate_rows(self, rows):
"""Add multiple rows to the batch. If the current batch meets one of the size
Expand All @@ -129,8 +154,8 @@ def mutate_rows(self, rows):
for row in rows:
self.mutate(row)

def flush(self):
"""Sends the current. batch to Cloud Bigtable.
def flush(self, in_executor=False):
"""Sends the rows in the batch to Cloud Bigtable.
For example:

.. literalinclude:: snippets.py
Expand All @@ -139,8 +164,25 @@ def flush(self):
:dedent: 4

"""
if len(self.rows) != 0:
self.table.mutate_rows(self.rows)
self.total_mutation_count = 0
self.total_size = 0
self.rows = []
rows_to_flush = []
while not self.rows.empty():
rows_to_flush.append(self.rows.get())
if in_executor:
self.executor.submit(self.flush_rows, rows_to_flush)
else:
# running it directly not in thread
self.flush_rows(rows_to_flush)

def flush_rows(self, rows_to_flush=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having both flush() and flush_rows() seems a little confusing. Especially since this doesn't seem to be "flushing" from the cache in any way. I could see people calling this with no arguments thinking it is the main flush function

Maybe this should be called mutate_rows? Or just made into an internal helper function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree this is confusing. There is already a public mutate_rows from before, and it has different behavior than this one. I'm changing it to private since user isn't expected to call it manually.

"""Mutate the specified rows."""
if len(rows_to_flush) > 0:
Mariatta marked this conversation as resolved.
Show resolved Hide resolved
self.table.mutate_rows(rows_to_flush)

def __exit__(self, exc_type, exc_value, exc_traceback):
"""Clean up resources. Flush and shutdown the ThreadPoolExecutor."""
self.close()

def close(self):
"""Clean up resources. Flush and shutdown the ThreadPoolExecutor."""
self.flush()
mutianf marked this conversation as resolved.
Show resolved Hide resolved
self.executor.shutdown(wait=True)