Skip to content

Commit

Permalink
Raise errors from mutations.
Browse files Browse the repository at this point in the history
  • Loading branch information
Mariatta committed Feb 1, 2023
1 parent 25c6265 commit 3e627ce
Showing 1 changed file with 63 additions and 7 deletions.
70 changes: 63 additions & 7 deletions google/cloud/bigtable/batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,22 @@
import concurrent.futures
import atexit

from google.cloud.bigtable.error import Status

FLUSH_COUNT = 100
MAX_MUTATIONS = 100000
MAX_ROW_BYTES = 20 * 1024 * 1024 # 20MB
MAX_BATCH_SIZE = 100 * 1024 * 1024
MAX_MUTATIONS_SIZE = 100 * 1024 * 1024 # 100MB


class MutationsBatchError(Exception):
"""Error in the batch request"""

def __init__(self, status_codes):
self.status_codes = status_codes
self.message = "Errors in batch mutations."
super().__init__(self.message)


class MaxMutationsError(ValueError):
"""The number of mutations for bulk request is too big."""
Expand Down Expand Up @@ -53,6 +65,7 @@ def put(self, item, block=True, timeout=None):
"""Insert an item to the queue. Recalculate queue size."""

mutation_count = len(item._get_mutations())
mutation_size = item.get_mutations_size()

if mutation_count > MAX_MUTATIONS:
raise MaxMutationsError(
Expand All @@ -61,6 +74,12 @@ def put(self, item, block=True, timeout=None):
)
)

if mutation_size > MAX_MUTATIONS_SIZE:
raise MaxMutationsError(
"The row key {} exceeds the size of mutations {}.".format(
item.row_key, mutation_size
)
)

self._queue.put(item, block=block, timeout=timeout)

Expand Down Expand Up @@ -188,6 +207,8 @@ def mutate(self, row):
match the number of rows that were retried
* :exc:`.batcher.MaxMutationsError` if any row exceeds max
mutations count.
* :exc:`.batcherMutationsBatchError` if there's any error in the
mutations.
"""
self._rows.put(row)

Expand Down Expand Up @@ -216,6 +237,8 @@ def mutate_rows(self, rows):
match the number of rows that were retried
* :exc:`.batcher.MaxMutationsError` if any row exceeds max
mutations count.
* :exc:`.batcherMutationsBatchError` if there's any error in the
mutations.
"""
for row in rows:
self.mutate(row)
Expand All @@ -229,31 +252,64 @@ def flush(self):
:end-before: [END bigtable_api_batcher_flush]
:dedent: 4
raises:
* :exc:`.batcherMutationsBatchError` if there's any error in the
mutations.
"""
rows_to_flush = []
while not self._rows.empty():
rows_to_flush.append(self._rows.get())
self.flush_rows(rows_to_flush)
response = self.flush_rows(rows_to_flush)
return response

def flush_async(self):
"""Sends the current batch to Cloud Bigtable asynchronously."""
"""Sends the current batch to Cloud Bigtable asynchronously.
raises:
* :exc:`.batcherMutationsBatchError` if there's any error in the
mutations.
"""

rows_to_flush = []
while not self._rows.empty():
rows_to_flush.append(self._rows.get())
self._executor.submit(self.flush_rows, rows_to_flush)
future = self._executor.submit(self.flush_rows, rows_to_flush)
# catch the exceptions in the mutation
exc = future.exception()
if exc:
raise exc
else:
result = future.result()
return result

def flush_rows(self, rows_to_flush=None):
"""Mutate the specified rows."""
"""Mutate the specified rows.
raises:
* :exc:`.batcherMutationsBatchError` if there's any error in the
mutations.
"""
response = []
if len(rows_to_flush) > 0:
self.table.mutate_rows(rows_to_flush)
# returns a list of error codes
response = self.table.mutate_rows(rows_to_flush)
if any(isinstance(result, Status) for result in response):
raise MutationsBatchError(status_codes=response)
return response

def __exit__(self, exc_type, exc_value, exc_traceback):
"""Clean up resources. Flush and shutdown the ThreadPoolExecutor."""
self.close()

def close(self):
"""Clean up resources. Flush and shutdown the ThreadPoolExecutor."""
"""Clean up resources. Flush and shutdown the ThreadPoolExecutor.
Any errors will be raised.
raises:
* :exc:`.batcherMutationsBatchError` if there's any error in the
mutations.
"""
self._is_open = False
self.flush()
self._executor.shutdown(wait=True)

0 comments on commit 3e627ce

Please sign in to comment.