Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Threaded MutationsBatcher #722

Merged
merged 24 commits into from
May 10, 2023
Merged
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Raise errors from mutations.
  • Loading branch information
Mariatta committed Apr 5, 2023
commit 649cd9b10078746f29cad78e6a70ea4571f4adea
70 changes: 63 additions & 7 deletions google/cloud/bigtable/batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,22 @@
import concurrent.futures
import atexit

from google.cloud.bigtable.error import Status

FLUSH_COUNT = 100
MAX_MUTATIONS = 100000
MAX_ROW_BYTES = 20 * 1024 * 1024 # 20MB
MAX_BATCH_SIZE = 100 * 1024 * 1024
MAX_MUTATIONS_SIZE = 100 * 1024 * 1024 # 100MB


class MutationsBatchError(Exception):
"""Error in the batch request"""

def __init__(self, status_codes):
self.status_codes = status_codes
self.message = "Errors in batch mutations."
super().__init__(self.message)


class MaxMutationsError(ValueError):
"""The number of mutations for bulk request is too big."""
Expand Down Expand Up @@ -53,6 +65,7 @@ def put(self, item, block=True, timeout=None):
"""Insert an item to the queue. Recalculate queue size."""

mutation_count = len(item._get_mutations())
mutation_size = item.get_mutations_size()

if mutation_count > MAX_MUTATIONS:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the Max_mutation constraint here? Is it for checking the concurrent requests that's sending to bigtable? I don't think throwing an error is the correct behavior here. Instead we should block on adding more elements to the batcher.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the max number of mutations for a single row. (100,000) In this case I think it should raise an error since we shouldn't split the mutations for one row?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking additional elements to the batcher is already done by the use of Python Queue itself. If we're trying to queue more than 100 elements, it will be blocked and won't be added until items have been popped from the queue.

raise MaxMutationsError(
Mariatta marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -61,6 +74,12 @@ def put(self, item, block=True, timeout=None):
)
)
mutianf marked this conversation as resolved.
Show resolved Hide resolved

if mutation_size > MAX_MUTATIONS_SIZE:
Mariatta marked this conversation as resolved.
Show resolved Hide resolved
raise MaxMutationsError(
"The row key {} exceeds the size of mutations {}.".format(
item.row_key, mutation_size
)
)

self._queue.put(item, block=block, timeout=timeout)

Expand Down Expand Up @@ -188,6 +207,8 @@ def mutate(self, row):
match the number of rows that were retried
* :exc:`.batcher.MaxMutationsError` if any row exceeds max
mutations count.
* :exc:`.batcherMutationsBatchError` if there's any error in the
mutations.
"""
self._rows.put(row)

Expand Down Expand Up @@ -216,6 +237,8 @@ def mutate_rows(self, rows):
match the number of rows that were retried
* :exc:`.batcher.MaxMutationsError` if any row exceeds max
mutations count.
* :exc:`.batcherMutationsBatchError` if there's any error in the
mutations.
"""
for row in rows:
self.mutate(row)
Expand All @@ -229,31 +252,64 @@ def flush(self):
:end-before: [END bigtable_api_batcher_flush]
:dedent: 4

raises:
* :exc:`.batcherMutationsBatchError` if there's any error in the
mutations.
"""
rows_to_flush = []
while not self._rows.empty():
rows_to_flush.append(self._rows.get())
Mariatta marked this conversation as resolved.
Show resolved Hide resolved
self.flush_rows(rows_to_flush)
response = self.flush_rows(rows_to_flush)
return response

def flush_async(self):
"""Sends the current batch to Cloud Bigtable asynchronously."""
"""Sends the current batch to Cloud Bigtable asynchronously.

raises:
* :exc:`.batcherMutationsBatchError` if there's any error in the
mutations.
"""

rows_to_flush = []
while not self._rows.empty():
rows_to_flush.append(self._rows.get())
self._executor.submit(self.flush_rows, rows_to_flush)
future = self._executor.submit(self.flush_rows, rows_to_flush)
# catch the exceptions in the mutation
exc = future.exception()
if exc:
Mariatta marked this conversation as resolved.
Show resolved Hide resolved
raise exc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this throw the original exception or will it wrap it?
If its the original exception, then the stacktrace will be rooted in the executor. If its wrapped then you are leaking implementation details.

In java we kept the original exception but added a synthetic stacktrace of the caller to help callers diagnose where they called the failed RPC.

HBase does something even hackier and modifies the stacktrace and inserts the callers stack at the top.

I dont know which approach idiomatic to python, but we should be intentional here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will re-raise the original exception. I will see how to do that in Python.
Can you share link on how it was done in Java?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case it the exception will have the stack trace all the way to the original spot where it was raised (line 303 below). The exception itself will have a list of individual error codes so user can still go through it to find out what failed.

else:
result = future.result()
return result

def flush_rows(self, rows_to_flush=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having both flush() and flush_rows() seems a little confusing. Especially since this doesn't seem to be "flushing" from the cache in any way. I could see people calling this with no arguments thinking it is the main flush function

Maybe this should be called mutate_rows? Or just made into an internal helper function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree this is confusing. There is already a public mutate_rows from before, and it has different behavior than this one. I'm changing it to private since user isn't expected to call it manually.

"""Mutate the specified rows."""
"""Mutate the specified rows.

raises:
* :exc:`.batcherMutationsBatchError` if there's any error in the
mutations.
"""
response = []
if len(rows_to_flush) > 0:
Mariatta marked this conversation as resolved.
Show resolved Hide resolved
self.table.mutate_rows(rows_to_flush)
# returns a list of error codes
response = self.table.mutate_rows(rows_to_flush)
if any(isinstance(result, Status) for result in response):
raise MutationsBatchError(status_codes=response)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed Status codes can be converted into exceptions using google.api_core.exceptions.from_grpc_status. Maybe it would be better to raise those directly, to give the full context, rather than using the raw status codes?

return response

def __exit__(self, exc_type, exc_value, exc_traceback):
"""Clean up resources. Flush and shutdown the ThreadPoolExecutor."""
self.close()

def close(self):
"""Clean up resources. Flush and shutdown the ThreadPoolExecutor."""
"""Clean up resources. Flush and shutdown the ThreadPoolExecutor.
Any errors will be raised.

raises:
* :exc:`.batcherMutationsBatchError` if there's any error in the
mutations.

"""
self._is_open = False
self.flush()
mutianf marked this conversation as resolved.
Show resolved Hide resolved
self._executor.shutdown(wait=True)