Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Threaded MutationsBatcher #722

Merged
merged 24 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Unit test adjustments
  • Loading branch information
Mariatta committed Apr 5, 2023
commit 148d1c0660a5dc16f42745619d71f649c8461645
8 changes: 5 additions & 3 deletions google/cloud/bigtable/batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def __enter__(self):

def mutate(self, row):
"""Add a row to the batch. If the current batch meets one of the size
limits, the batch is sent synchronously.
limits, the batch is sent asynchronously.

For example:

Expand All @@ -156,7 +156,7 @@ def mutate(self, row):

def mutate_rows(self, rows):
"""Add multiple rows to the batch. If the current batch meets one of the size
limits, the batch is sent synchronously.
limits, the batch is sent asynchronously.

For example:

Expand All @@ -180,7 +180,7 @@ def mutate_rows(self, rows):
self.mutate(row)

def flush(self):
"""Sends the current. batch to Cloud Bigtable.
"""Sends the current batch to Cloud Bigtable synchronously.
For example:

.. literalinclude:: snippets.py
Expand All @@ -195,6 +195,8 @@ def flush(self):
self.flush_rows(rows_to_flush)

def flush_async(self):
"""Sends the current batch to Cloud Bigtable asynchronously."""

rows_to_flush = []
while not self._rows.empty():
rows_to_flush.append(self._rows.get())
Expand Down
134 changes: 71 additions & 63 deletions tests/unit/test_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,77 +17,68 @@
import pytest

from google.cloud.bigtable.row import DirectRow
from google.cloud.bigtable.batcher import MutationsBatcher

TABLE_ID = "table-id"
TABLE_NAME = "/tables/" + TABLE_ID


def _make_mutation_batcher(table, **kw):
from google.cloud.bigtable.batcher import MutationsBatcher

return MutationsBatcher(table, **kw)


def test_mutation_batcher_constructor():
table = _Table(TABLE_NAME)

mutation_batcher = _make_mutation_batcher(table)
assert table is mutation_batcher.table
with MutationsBatcher(table) as mutation_batcher:
assert table is mutation_batcher.table


def test_mutation_batcher_mutate_row():
table = _Table(TABLE_NAME)
mutation_batcher = _make_mutation_batcher(table=table)
with MutationsBatcher(table=table) as mutation_batcher:

rows = [
DirectRow(row_key=b"row_key"),
DirectRow(row_key=b"row_key_2"),
DirectRow(row_key=b"row_key_3"),
DirectRow(row_key=b"row_key_4"),
]
rows = [
DirectRow(row_key=b"row_key"),
DirectRow(row_key=b"row_key_2"),
DirectRow(row_key=b"row_key_3"),
DirectRow(row_key=b"row_key_4"),
]

mutation_batcher.mutate_rows(rows)
mutation_batcher.flush()
mutation_batcher.mutate_rows(rows)

assert table.mutation_calls == 1


def test_mutation_batcher_mutate():
table = _Table(TABLE_NAME)
mutation_batcher = _make_mutation_batcher(table=table)
with MutationsBatcher(table=table) as mutation_batcher:

row = DirectRow(row_key=b"row_key")
row.set_cell("cf1", b"c1", 1)
row.set_cell("cf1", b"c2", 2)
row.set_cell("cf1", b"c3", 3)
row.set_cell("cf1", b"c4", 4)
row = DirectRow(row_key=b"row_key")
row.set_cell("cf1", b"c1", 1)
row.set_cell("cf1", b"c2", 2)
row.set_cell("cf1", b"c3", 3)
row.set_cell("cf1", b"c4", 4)

mutation_batcher.mutate(row)

mutation_batcher.flush()
mutation_batcher.mutate(row)

assert table.mutation_calls == 1


def test_mutation_batcher_flush_w_no_rows():
table = _Table(TABLE_NAME)
mutation_batcher = _make_mutation_batcher(table=table)
mutation_batcher.flush()
with MutationsBatcher(table=table) as mutation_batcher:
mutation_batcher.flush()

assert table.mutation_calls == 0


def test_mutation_batcher_mutate_w_max_flush_count():
table = _Table(TABLE_NAME)
mutation_batcher = _make_mutation_batcher(table=table, flush_count=3)
with MutationsBatcher(table=table, flush_count=3) as mutation_batcher:

row_1 = DirectRow(row_key=b"row_key_1")
row_2 = DirectRow(row_key=b"row_key_2")
row_3 = DirectRow(row_key=b"row_key_3")
row_1 = DirectRow(row_key=b"row_key_1")
row_2 = DirectRow(row_key=b"row_key_2")
row_3 = DirectRow(row_key=b"row_key_3")

mutation_batcher.mutate(row_1)
mutation_batcher.mutate(row_2)
mutation_batcher.mutate(row_3)
mutation_batcher.mutate(row_1)
mutation_batcher.mutate(row_2)
mutation_batcher.mutate(row_3)

assert table.mutation_calls == 1

Expand All @@ -97,58 +88,55 @@ def test_mutation_batcher_mutate_with_max_mutations_failure():
from google.cloud.bigtable.batcher import MaxMutationsError

table = _Table(TABLE_NAME)
mutation_batcher = _make_mutation_batcher(table=table)
with MutationsBatcher(table=table) as mutation_batcher:

row = DirectRow(row_key=b"row_key")
row.set_cell("cf1", b"c1", 1)
row.set_cell("cf1", b"c2", 2)
row.set_cell("cf1", b"c3", 3)
row.set_cell("cf1", b"c4", 4)
row = DirectRow(row_key=b"row_key")
row.set_cell("cf1", b"c1", 1)
row.set_cell("cf1", b"c2", 2)
row.set_cell("cf1", b"c3", 3)
row.set_cell("cf1", b"c4", 4)

with pytest.raises(MaxMutationsError):
mutation_batcher.mutate(row)
with pytest.raises(MaxMutationsError):
mutation_batcher.mutate(row)


@mock.patch("google.cloud.bigtable.batcher.MAX_MUTATIONS", new=3)
def test_mutation_batcher_mutate_w_max_mutations():
table = _Table(TABLE_NAME)
mutation_batcher = _make_mutation_batcher(table=table)
with MutationsBatcher(table=table) as mutation_batcher:

row = DirectRow(row_key=b"row_key")
row.set_cell("cf1", b"c1", 1)
row.set_cell("cf1", b"c2", 2)
row.set_cell("cf1", b"c3", 3)
row = DirectRow(row_key=b"row_key")
row.set_cell("cf1", b"c1", 1)
row.set_cell("cf1", b"c2", 2)
row.set_cell("cf1", b"c3", 3)

mutation_batcher.mutate(row)
mutation_batcher.flush()
mutation_batcher.mutate(row)

assert table.mutation_calls == 1


def test_mutation_batcher_mutate_w_max_row_bytes():
table = _Table(TABLE_NAME)
mutation_batcher = _make_mutation_batcher(
with MutationsBatcher(
table=table, max_row_bytes=3 * 1024 * 1024
)
) as mutation_batcher:

number_of_bytes = 1 * 1024 * 1024
max_value = b"1" * number_of_bytes
number_of_bytes = 1 * 1024 * 1024
max_value = b"1" * number_of_bytes

row = DirectRow(row_key=b"row_key")
row.set_cell("cf1", b"c1", max_value)
row.set_cell("cf1", b"c2", max_value)
row.set_cell("cf1", b"c3", max_value)
row = DirectRow(row_key=b"row_key")
row.set_cell("cf1", b"c1", max_value)
row.set_cell("cf1", b"c2", max_value)
row.set_cell("cf1", b"c3", max_value)

mutation_batcher.mutate(row)
mutation_batcher.mutate(row)

assert table.mutation_calls == 1


def test_mutations_batcher_flushed_when_closed():
table = _Table(TABLE_NAME)
mutation_batcher = _make_mutation_batcher(
table=table, max_row_bytes=3 * 1024 * 1024
)
mutation_batcher = MutationsBatcher(table=table, max_row_bytes=3 * 1024 * 1024)

number_of_bytes = 1 * 1024 * 1024
max_value = b"1" * number_of_bytes
Expand All @@ -158,11 +146,31 @@ def test_mutations_batcher_flushed_when_closed():
row.set_cell("cf1", b"c2", max_value)

mutation_batcher.mutate(row)
assert table.mutation_calls == 0

mutation_batcher.close()

assert table.mutation_calls == 1


def test_mutations_batcher_context_manager_flushed_when_closed():
table = _Table(TABLE_NAME)
with MutationsBatcher(
table=table, max_row_bytes=3 * 1024 * 1024
) as mutation_batcher:

number_of_bytes = 1 * 1024 * 1024
max_value = b"1" * number_of_bytes

row = DirectRow(row_key=b"row_key")
row.set_cell("cf1", b"c1", max_value)
row.set_cell("cf1", b"c2", max_value)

mutation_batcher.mutate(row)

assert table.mutation_calls == 1


class _Instance(object):
def __init__(self, client=None):
self._client = client
Expand Down