Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: optimize row merging #628

Merged
merged 25 commits into from
Aug 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 41 additions & 165 deletions google/cloud/bigtable/row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,25 @@
import copy

import grpc # type: ignore

import warnings
from google.api_core import exceptions
from google.api_core import retry
from google.cloud._helpers import _to_bytes # type: ignore

from google.cloud.bigtable.row_merger import _RowMerger, _State
from google.cloud.bigtable_v2.types import bigtable as data_messages_v2_pb2
from google.cloud.bigtable_v2.types import data as data_v2_pb2
from google.cloud.bigtable.row import Cell, InvalidChunk, PartialRowData


# Some classes need to be re-exported here to keep backwards
# compatibility. Those classes were moved to row_merger, but we dont want to
# break enduser's imports. This hack, ensures they don't get marked as unused.
_ = (Cell, InvalidChunk, PartialRowData)


class PartialCellData(object):
"""Representation of partial cell in a Google Cloud Bigtable Table.

These are expected to be updated directly from a
:class:`._generated.bigtable_service_messages_pb2.ReadRowsResponse`

:type row_key: bytes
:param row_key: The key for the row holding the (partial) cell.

:type family_name: str
:param family_name: The family name of the (partial) cell.

:type qualifier: bytes
:param qualifier: The column qualifier of the (partial) cell.

:type timestamp_micros: int
:param timestamp_micros: The timestamp (in microsecods) of the
(partial) cell.

:type labels: list of str
:param labels: labels assigned to the (partial) cell

:type value: bytes
:param value: The (accumulated) value of the (partial) cell.
"""
class PartialCellData(object): # pragma: NO COVER
"""This class is no longer used and will be removed in the future"""

def __init__(
self, row_key, family_name, qualifier, timestamp_micros, labels=(), value=b""
Expand All @@ -69,11 +49,6 @@ def __init__(
self.value = value

def append_value(self, value):
"""Append bytes from a new chunk to value.

:type value: bytes
:param value: bytes to append
"""
self.value += value


Expand Down Expand Up @@ -168,14 +143,7 @@ class PartialRowsData(object):
def __init__(self, read_method, request, retry=DEFAULT_RETRY_READ_ROWS):
# Counter for rows returned to the user
self._counter = 0
# In-progress row, unset until first response, after commit/reset
self._row = None
# Last complete row, unset until first commit
self._previous_row = None
# In-progress cell, unset until first response, after completion
self._cell = None
# Last complete cell, unset until first completion, after new row
self._previous_cell = None
self._row_merger = _RowMerger()

# May be cached from previous response
self.last_scanned_row_key = None
Expand All @@ -192,20 +160,35 @@ def __init__(self, read_method, request, retry=DEFAULT_RETRY_READ_ROWS):
self.response_iterator = read_method(request, timeout=self.retry._deadline + 1)

self.rows = {}
self._state = self.STATE_NEW_ROW

# Flag to stop iteration, for any reason not related to self.retry()
self._cancelled = False

@property
def state(self):
"""State machine state.

:rtype: str
:returns: name of state corresponding to current row / chunk
processing.
def state(self): # pragma: NO COVER
"""
DEPRECATED: this property is deprecated and will be removed in the
future.
"""
return self.read_states[self._state]
warnings.warn(
"`PartialRowsData#state()` is deprecated and will be removed in the future",
DeprecationWarning,
stacklevel=2,
)

# Best effort: try to map internal RowMerger states to old strings for
# backwards compatibility
internal_state = self._row_merger.state
if internal_state == _State.ROW_START:
return self.NEW_ROW
# note: _State.CELL_START, _State.CELL_COMPLETE are transient states
# and will not be visible in between chunks
elif internal_state == _State.CELL_IN_PROGRESS:
return self.CELL_IN_PROGRESS
elif internal_state == _State.ROW_COMPLETE:
return self.NEW_ROW
else:
raise RuntimeError("unexpected internal state: " + self._)

def cancel(self):
"""Cancels the iterator, closing the stream."""
Expand Down Expand Up @@ -241,6 +224,7 @@ def _on_error(self, exc):
if self.last_scanned_row_key:
retry_request = self._create_retry_request()

self._row_merger = _RowMerger(self._row_merger.last_seen_row_key)
self.response_iterator = self.read_method(retry_request)

def _read_next(self):
Expand All @@ -266,125 +250,23 @@ def __iter__(self):
try:
response = self._read_next_response()
except StopIteration:
if self.state != self.NEW_ROW:
raise ValueError("The row remains partial / is not committed.")
self._row_merger.finalize()
break
except InvalidRetryRequest:
self._cancelled = True
break

for chunk in response.chunks:
for row in self._row_merger.process_chunks(response):
self.last_scanned_row_key = self._row_merger.last_seen_row_key
self._counter += 1

yield row

if self._cancelled:
break
self._process_chunk(chunk)
if chunk.commit_row:
self.last_scanned_row_key = self._previous_row.row_key
self._counter += 1
yield self._previous_row

resp_last_key = response.last_scanned_row_key
if resp_last_key and resp_last_key > self.last_scanned_row_key:
self.last_scanned_row_key = resp_last_key

def _process_chunk(self, chunk):
if chunk.reset_row:
self._validate_chunk_reset_row(chunk)
self._row = None
self._cell = self._previous_cell = None
self._state = self.STATE_NEW_ROW
return

self._update_cell(chunk)

if self._row is None:
if (
self._previous_row is not None
and self._cell.row_key <= self._previous_row.row_key
):
raise InvalidChunk()
self._row = PartialRowData(self._cell.row_key)

if chunk.value_size == 0:
self._state = self.STATE_ROW_IN_PROGRESS
self._save_current_cell()
else:
self._state = self.STATE_CELL_IN_PROGRESS

if chunk.commit_row:
if chunk.value_size > 0:
raise InvalidChunk()

self._previous_row = self._row
self._row = None
self._previous_cell = None
self._state = self.STATE_NEW_ROW

def _update_cell(self, chunk):
if self._cell is None:
qualifier = None
if chunk.HasField("qualifier"):
qualifier = chunk.qualifier.value

family = None
if chunk.HasField("family_name"):
family = chunk.family_name.value

self._cell = PartialCellData(
chunk.row_key,
family,
qualifier,
chunk.timestamp_micros,
chunk.labels,
chunk.value,
)
self._copy_from_previous(self._cell)
self._validate_cell_data_new_cell()
else:
self._cell.append_value(chunk.value)

def _validate_cell_data_new_cell(self):
cell = self._cell
if not cell.row_key or not cell.family_name or cell.qualifier is None:
raise InvalidChunk()

prev = self._previous_cell
if prev and prev.row_key != cell.row_key:
raise InvalidChunk()

def _validate_chunk_reset_row(self, chunk):
# No reset for new row
_raise_if(self._state == self.STATE_NEW_ROW)

# No reset with other keys
_raise_if(chunk.row_key)
_raise_if(chunk.HasField("family_name"))
_raise_if(chunk.HasField("qualifier"))
_raise_if(chunk.timestamp_micros)
_raise_if(chunk.labels)
_raise_if(chunk.value_size)
_raise_if(chunk.value)
_raise_if(chunk.commit_row)

def _save_current_cell(self):
"""Helper for :meth:`consume_next`."""
row, cell = self._row, self._cell
family = row._cells.setdefault(cell.family_name, {})
qualified = family.setdefault(cell.qualifier, [])
complete = Cell.from_pb(cell)
qualified.append(complete)
self._cell, self._previous_cell = None, cell

def _copy_from_previous(self, cell):
"""Helper for :meth:`consume_next`."""
previous = self._previous_cell
if previous is not None:
if not cell.row_key:
cell.row_key = previous.row_key
if not cell.family_name:
cell.family_name = previous.family_name
# NOTE: ``cell.qualifier`` **can** be empty string.
if cell.qualifier is None:
cell.qualifier = previous.qualifier
# The last response might not have generated any rows, but it
# could've updated last_scanned_row_key
mutianf marked this conversation as resolved.
Show resolved Hide resolved
self.last_scanned_row_key = self._row_merger.last_seen_row_key


class _ReadRowsRequestManager(object):
Expand Down Expand Up @@ -494,9 +376,3 @@ def _start_key_set(row_range):
def _end_key_set(row_range):
"""Helper for :meth:`_filter_row_ranges`"""
return row_range.end_key_open or row_range.end_key_closed


def _raise_if(predicate, *args):
"""Helper for validation methods."""
if predicate:
raise InvalidChunk(*args)
Loading