Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: optimize row merging #628

Merged
merged 25 commits into from
Aug 17, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
improve coverage
  • Loading branch information
igorbernstein2 committed Aug 10, 2022
commit a93efbca87bb5479014a10af32d006e7699b1221
8 changes: 3 additions & 5 deletions google/cloud/bigtable/row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
_ = (Cell, InvalidChunk, PartialRowData)


class PartialCellData(object):
class PartialCellData(object): # pragma: no cover
"""This class is no longer used and will be removed in the future"""

def __init__(
Expand Down Expand Up @@ -180,8 +180,8 @@ def state(self):
internal_state = self._row_merger.state
if internal_state == _State.ROW_START:
return self.NEW_ROW
elif internal_state in (_State.CELL_START, _State.CELL_COMPLETE):
return self.ROW_IN_PROGRESS
# note: _State.CELL_START, _State.CELL_COMPLETE are transient states
# and will not be visible in between chunks
elif internal_state == _State.CELL_IN_PROGRESS:
return self.CELL_IN_PROGRESS
elif internal_state == _State.ROW_COMPLETE:
Expand Down Expand Up @@ -256,8 +256,6 @@ def __iter__(self):
break

for row in self._row_merger.process_chunks(response):
if self._cancelled:
break
self.last_scanned_row_key = self._row_merger.last_seen_row_key
self._counter += 1

Expand Down
6 changes: 3 additions & 3 deletions google/cloud/bigtable/row_merger.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

_MISSING_COLUMN_FAMILY = "Column family {} is not among the cells stored in this row."
_MISSING_COLUMN = (
"Column {} is not among the cells stored in this row in the " "column family {}."
"Column {} is not among the cells stored in this row in the column family {}."
)
_MISSING_INDEX = (
"Index {!r} is not valid for the cells stored in this row for column {} "
Expand Down Expand Up @@ -258,7 +258,7 @@ class Cell(object):
def __init__(self, value, timestamp_micros, labels=None):
self.value = value
self.timestamp_micros = timestamp_micros
self.labels = list(labels) if labels is not None else []
self.labels = list(labels) if labels else []

@classmethod
def from_pb(cls, cell_pb):
Expand Down Expand Up @@ -399,7 +399,7 @@ def _handle_cell_start(self, chunk):
if chunk.HasField("qualifier"):
self.row.cell.qualifier = chunk.qualifier.value
if self.row.cell.qualifier is None:
raise InvalidChunk("missing family for a new cell")
raise InvalidChunk("missing qualifier for a new cell")

self.row.cell.timestamp = chunk.timestamp_micros
self.row.cell.labels = chunk.labels
Expand Down
135 changes: 135 additions & 0 deletions tests/unit/test_row_merger.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from google.cloud.bigtable.row_data import PartialRowsData, PartialRowData, InvalidChunk
from google.cloud.bigtable_v2.types.bigtable import ReadRowsResponse
from google.cloud.bigtable.row_merger import _RowMerger


# TODO: autogenerate protos from
Expand Down Expand Up @@ -76,3 +77,137 @@ def fake_read(*args, **kwargs):

for expected, actual in zip_longest(test_case.results, actual_results):
assert actual == expected


def test_out_of_order_rows():
row_merger = _RowMerger(last_seen_row=b"z")
with pytest.raises(InvalidChunk):
list(row_merger.process_chunks(ReadRowsResponse(last_scanned_row_key=b"a")))


def test_bare_rest():
with pytest.raises(InvalidChunk):
_process_chunks(
ReadRowsResponse.CellChunk(
ReadRowsResponse.CellChunk(reset_row=True, row_key=b"a")
)
)
with pytest.raises(InvalidChunk):
_process_chunks(
ReadRowsResponse.CellChunk(
ReadRowsResponse.CellChunk(reset_row=True, family_name="f")
)
)
with pytest.raises(InvalidChunk):
_process_chunks(
ReadRowsResponse.CellChunk(
ReadRowsResponse.CellChunk(reset_row=True, qualifier=b"q")
)
)
with pytest.raises(InvalidChunk):
_process_chunks(
ReadRowsResponse.CellChunk(
ReadRowsResponse.CellChunk(reset_row=True, timestamp_micros=1000)
)
)
with pytest.raises(InvalidChunk):
_process_chunks(
ReadRowsResponse.CellChunk(
ReadRowsResponse.CellChunk(reset_row=True, value=b"v")
)
)


def test_missing_family():
with pytest.raises(InvalidChunk):
_process_chunks(
ReadRowsResponse.CellChunk(
row_key=b"a",
qualifier=b"q",
timestamp_micros=1000,
value=b"v",
commit_row=True,
)
)


def test_mid_cell_row_key_change():
with pytest.raises(InvalidChunk):
_process_chunks(
ReadRowsResponse.CellChunk(
row_key=b"a",
family_name="f",
qualifier=b"q",
timestamp_micros=1000,
value_size=2,
value=b"v",
),
ReadRowsResponse.CellChunk(row_key=b"b", value=b"v", commit_row=True),
)


def test_mid_cell_family_change():
with pytest.raises(InvalidChunk):
_process_chunks(
ReadRowsResponse.CellChunk(
row_key=b"a",
family_name="f",
qualifier=b"q",
timestamp_micros=1000,
value_size=2,
value=b"v",
),
ReadRowsResponse.CellChunk(family="f2", value=b"v", commit_row=True),
)


def test_mid_cell_qualifier_change():
with pytest.raises(InvalidChunk):
_process_chunks(
ReadRowsResponse.CellChunk(
row_key=b"a",
family_name="f",
qualifier=b"q",
timestamp_micros=1000,
value_size=2,
value=b"v",
),
ReadRowsResponse.CellChunk(qualifier=b"q2", value=b"v", commit_row=True),
)


def test_mid_cell_timestamp_change():
with pytest.raises(InvalidChunk):
_process_chunks(
ReadRowsResponse.CellChunk(
row_key=b"a",
family_name="f",
qualifier=b"q",
timestamp_micros=1000,
value_size=2,
value=b"v",
),
ReadRowsResponse.CellChunk(
timestamp_micros=2000, value=b"v", commit_row=True
),
)


def test_mid_cell_labels_change():
with pytest.raises(InvalidChunk):
_process_chunks(
ReadRowsResponse.CellChunk(
row_key=b"a",
family_name="f",
qualifier=b"q",
timestamp_micros=1000,
value_size=2,
value=b"v",
),
ReadRowsResponse.CellChunk(labels=["b"], value=b"v", commit_row=True),
)


def _process_chunks(*chunks):
req = ReadRowsResponse.pb(ReadRowsResponse(chunks=chunks))
return list(_RowMerger().process_chunks(req))