Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Threaded MutationsBatcher #722

Merged
merged 24 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Use from_grpc_status
Add comments
  • Loading branch information
Mariatta committed Apr 5, 2023
commit 70f1cc8040a651b225122b641f40b898a76481ad
29 changes: 21 additions & 8 deletions google/cloud/bigtable/batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,31 @@
import concurrent.futures
import atexit

from google.api_core.exceptions import from_grpc_status


# Max number of items in the queue. Queue will be flushed if this number is reached
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these variable names are a bit confusing. Maybe we can refactor them to be:

batch_element_count # After this many elements are accumulated, they will be wrapped up in a batch and sent.
batch_byte_size # After this many bytes are accumulated, they will be wrapped up in a batch and sent.
max_outstanding_elements # After these many elements are sent, block until the previous batch is processed
max_outstanding_bytes # After these many bytes are sent, block until the previous batch is processed.

what do you think?

FLUSH_COUNT = 100

# Max number of mutations for a single row at one time
MAX_MUTATIONS = 100000

# Max size (in bytes) for a single mutation
MAX_ROW_BYTES = 20 * 1024 * 1024 # 20MB

# Max size (in bytes) for a single request
MAX_MUTATIONS_SIZE = 100 * 1024 * 1024 # 100MB


class MutationsBatchError(Exception):
"""Error in the batch request"""
Mariatta marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self, status_codes):
self.status_codes = status_codes
self.errors = [
from_grpc_status(status_code.code, status_code.message)
for status_code in status_codes
if status_code.code != 0
]
self.message = "Errors in batch mutations."
super().__init__(self.message)

Expand Down Expand Up @@ -129,8 +142,7 @@ class MutationsBatcher(object):
memory will not necessarily be sent to the service, even after the
completion of the mutate() method.

TODO: Performance would dramatically improve if this class had the
capability of asynchronous, parallel RPCs.
Note on thread safety: The same MutationBatcher cannot be shared by multiple end-user threads.

:type table: class
:param table: class:`~google.cloud.bigtable.table.Table`.
Expand Down Expand Up @@ -276,7 +288,7 @@ def flush_async(self):
# catch the exceptions in the mutation
exc = future.exception()
if exc:
Mariatta marked this conversation as resolved.
Show resolved Hide resolved
raise exc
raise exc from exc
else:
result = future.result()
return result
Expand All @@ -288,20 +300,21 @@ def flush_rows(self, rows_to_flush=None):
* :exc:`.batcherMutationsBatchError` if there's any error in the
mutations.
"""
response = []
responses = []
if len(rows_to_flush) > 0:
Mariatta marked this conversation as resolved.
Show resolved Hide resolved
# returns a list of status codes
response = self.table.mutate_rows(rows_to_flush)
responses = []

has_error = False
for result in response:
if result.code != 0:
has_error = True
responses.append(result)

if has_error:
raise MutationsBatchError(status_codes=response)
return response
raise MutationsBatchError(status_codes=responses)

return responses

def __exit__(self, exc_type, exc_value, exc_traceback):
"""Clean up resources. Flush and shutdown the ThreadPoolExecutor."""
Expand Down
14 changes: 10 additions & 4 deletions tests/unit/test_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def test_mutations_batcher_flush_interval(mocked_flush):
def test_mutations_batcher_response_with_error_codes():
from google.rpc.status_pb2 import Status

mocked_response = [Status(code=0), Status(code=1)]
mocked_response = [Status(code=1), Status(code=5)]

with mock.patch("tests.unit.test_batcher._Table") as mocked_table:
table = mocked_table.return_value
Expand All @@ -243,13 +243,16 @@ def test_mutations_batcher_response_with_error_codes():
with pytest.raises(MutationsBatchError) as exc:
mutation_batcher.flush()
assert exc.value.message == "Errors in batch mutations."
assert exc.value.status_codes == mocked_response
assert len(exc.value.errors) == 2

assert exc.value.errors[0].message == mocked_response[0].message
assert exc.value.errors[1].message == mocked_response[1].message


def test_mutations_batcher_flush_async_raises_exception():
from google.rpc.status_pb2 import Status

mocked_response = [Status(code=0), Status(code=1)]
mocked_response = [Status(code=1, message="err1"), Status(code=5, message="err5")]

with mock.patch("tests.unit.test_batcher._Table") as mocked_table:
table = mocked_table.return_value
Expand All @@ -263,7 +266,10 @@ def test_mutations_batcher_flush_async_raises_exception():
with pytest.raises(MutationsBatchError) as exc:
mutation_batcher.flush_async()
assert exc.value.message == "Errors in batch mutations."
assert exc.value.status_codes == mocked_response
assert len(exc.value.errors) == 2

assert exc.value.errors[0].message == mocked_response[0].message
assert exc.value.errors[1].message == mocked_response[1].message


class _Instance(object):
Expand Down