Skip to content

Commit

Permalink
Changes based on PR review
Browse files Browse the repository at this point in the history
- Remove debugging variable
- Update variable names
  • Loading branch information
Mariatta committed Apr 5, 2023
1 parent c0584b5 commit 9899e45
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 15 deletions.
26 changes: 12 additions & 14 deletions google/cloud/bigtable/batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def __init__(self, max_mutation_bytes=MAX_MUTATION_SIZE, flush_count=FLUSH_COUNT
self._queue = queue.Queue()
self.total_mutation_count = 0
self.total_size = 0
self.max_row_bytes = max_row_bytes
self.max_mutation_bytes = max_mutation_bytes
self.flush_count = flush_count

def get(self):
Expand All @@ -74,7 +74,7 @@ def full(self):
"""Check if the queue is full."""
if (
self.total_mutation_count >= self.flush_count
or self.total_size >= self.max_row_bytes
or self.total_size >= self.max_mutation_bytes
):
return True
return False
Expand All @@ -94,7 +94,9 @@ class BatchInfo:

class FlowControl(object):
def __init__(
self, max_mutations=MAX_OUTSTANDING_ELEMENTS, max_row_bytes=MAX_OUTSTANDING_BYTES
self,
max_mutations=MAX_OUTSTANDING_ELEMENTS,
max_mutation_bytes=MAX_OUTSTANDING_BYTES,
):
"""Control the inflight requests. Keep track of the mutations, row bytes and row counts.
As requests to backend are being made, adjust the number of mutations being processed.
Expand All @@ -103,22 +105,21 @@ def __init__(
Reopen the flow as requests are finished.
"""
self.max_mutations = max_mutations
self.max_row_bytes = max_row_bytes
self.max_mutation_bytes = max_mutation_bytes
self.inflight_mutations = 0
self.inflight_size = 0
self.inflight_rows_count = 0 # TODO: FOR DEBUG, DELETE before merging
self.event = threading.Event()
self.event.set()

def is_blocked(self):
"""Returns True if:
- inflight mutations >= max_mutations, or
- inflight bytes size >= max_row_bytes, or
- inflight bytes size >= max_mutation_bytes, or
"""

return (
self.inflight_mutations >= self.max_mutations
or self.inflight_size >= self.max_row_bytes
or self.inflight_size >= self.max_mutation_bytes
)

def control_flow(self, batch_info):
Expand All @@ -128,8 +129,6 @@ def control_flow(self, batch_info):

self.inflight_mutations += batch_info.mutations_count
self.inflight_size += batch_info.mutations_size
self.inflight_rows_count += batch_info.rows_count

self.set_flow_control_status()
self.wait()

Expand Down Expand Up @@ -157,7 +156,6 @@ def release(self, batch_info):
"""
self.inflight_mutations -= batch_info.mutations_count
self.inflight_size -= batch_info.mutations_size
self.inflight_rows_count -= batch_info.rows_count
self.set_flow_control_status()


Expand Down Expand Up @@ -204,7 +202,7 @@ def __init__(
flush_interval=1,
):
self._rows = _MutationsBatchQueue(
max_row_bytes=max_row_bytes, flush_count=flush_count
max_mutation_bytes=max_row_bytes, flush_count=flush_count
)
self.table = table
self._executor = concurrent.futures.ThreadPoolExecutor()
Expand All @@ -213,7 +211,7 @@ def __init__(
self._timer.start()
self.flow_control = FlowControl(
max_mutations=MAX_OUTSTANDING_ELEMENTS,
max_row_bytes=MAX_OUTSTANDING_BYTES,
max_mutation_bytes=MAX_OUTSTANDING_BYTES,
)
self.futures_mapping = {}
self.exceptions = queue.Queue()
Expand All @@ -224,7 +222,7 @@ def flush_count(self):

@property
def max_row_bytes(self):
return self._rows.max_row_bytes
return self._rows.max_mutation_bytes

def __enter__(self):
"""Starting the MutationsBatcher as a context manager"""
Expand Down Expand Up @@ -325,7 +323,7 @@ def flush_async(self):
rows_count >= self.flush_count
or mutations_size >= self.max_row_bytes
or mutations_count >= self.flow_control.max_mutations
or mutations_size >= self.flow_control.max_row_bytes
or mutations_size >= self.flow_control.max_mutation_bytes
or self._rows.empty() # submit when it reached the end of the queue
):
self.flow_control.control_flow(batch_info)
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def test_flow_control_event_is_set_when_not_blocked():
def test_flow_control_event_is_not_set_when_blocked():
flow_control = FlowControl()
flow_control.inflight_mutations = flow_control.max_mutations
flow_control.inflight_size = flow_control.max_row_bytes
flow_control.inflight_size = flow_control.max_mutation_bytes

flow_control.set_flow_control_status()
assert not flow_control.event.is_set()
Expand Down

0 comments on commit 9899e45

Please sign in to comment.