Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Threaded MutationsBatcher #722

Merged
merged 24 commits into from
May 10, 2023
Merged

Feat: Threaded MutationsBatcher #722

merged 24 commits into from
May 10, 2023

Conversation

Mariatta
Copy link
Contributor

@Mariatta Mariatta commented Jan 12, 2023

  • Batch mutations in a thread to allow concurrent batching
  • Flush the batch every second
  • Flow control
  • Batcher can now be used as a contextmanager

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:

  • Make sure to open an issue as a bug/issue before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea
  • Ensure the tests and linter pass
  • Code coverage does not decrease (if any source code was changed)
  • Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> 🦕

@product-auto-label product-auto-label bot added size: m Pull request size is medium. api: bigtable Issues related to the googleapis/python-bigtable API. size: l Pull request size is large. and removed size: m Pull request size is medium. labels Jan 12, 2023
@Mariatta Mariatta marked this pull request as ready for review January 17, 2023 19:28
@Mariatta Mariatta requested review from a team as code owners January 17, 2023 19:28

mutation_count = len(item._get_mutations())

if mutation_count > MAX_MUTATIONS:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the Max_mutation constraint here? Is it for checking the concurrent requests that's sending to bigtable? I don't think throwing an error is the correct behavior here. Instead we should block on adding more elements to the batcher.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the max number of mutations for a single row. (100,000) In this case I think it should raise an error since we shouldn't split the mutations for one row?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking additional elements to the batcher is already done by the use of Python Queue itself. If we're trying to queue more than 100 elements, it will be blocked and won't be added until items have been popped from the queue.

google/cloud/bigtable/batcher.py Outdated Show resolved Hide resolved
google/cloud/bigtable/batcher.py Outdated Show resolved Hide resolved
google/cloud/bigtable/batcher.py Show resolved Hide resolved
google/cloud/bigtable/batcher.py Outdated Show resolved Hide resolved
Copy link
Contributor

@igorbernstein2 igorbernstein2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we don't really have any flow control here. Do you plan on adding it in a follow up PR?

Comment on lines 18 to 25
FLUSH_COUNT = 1000
FLUSH_COUNT = 100
MAX_MUTATIONS = 100000
MAX_ROW_BYTES = 5242880 # 5MB
MAX_ROW_BYTES = 20 * 1024 * 1024 # 20MB
MAX_MUTATIONS_SIZE = 100 * 1024 * 1024 # 100MB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comments what these constants control

self.total_size = 0
self.max_row_bytes = max_row_bytes

def get(self, block=True, timeout=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when would block = False?

google/cloud/bigtable/batcher.py Show resolved Hide resolved
Comment on lines 277 to 279
exc = future.exception()
if exc:
raise exc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this throw the original exception or will it wrap it?
If its the original exception, then the stacktrace will be rooted in the executor. If its wrapped then you are leaking implementation details.

In java we kept the original exception but added a synthetic stacktrace of the caller to help callers diagnose where they called the failed RPC.

HBase does something even hackier and modifies the stacktrace and inserts the callers stack at the top.

I dont know which approach idiomatic to python, but we should be intentional here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will re-raise the original exception. I will see how to do that in Python.
Can you share link on how it was done in Java?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case it the exception will have the stack trace all the way to the original spot where it was raised (line 303 below). The exception itself will have a list of individual error codes so user can still go through it to find out what failed.

responses.append(result)

if has_error:
raise MutationsBatchError(status_codes=response)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed Status codes can be converted into exceptions using google.api_core.exceptions.from_grpc_status. Maybe it would be better to raise those directly, to give the full context, rather than using the raw status codes?


FLUSH_COUNT = 1000

# Max number of items in the queue. Queue will be flushed if this number is reached
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these variable names are a bit confusing. Maybe we can refactor them to be:

batch_element_count # After this many elements are accumulated, they will be wrapped up in a batch and sent.
batch_byte_size # After this many bytes are accumulated, they will be wrapped up in a batch and sent.
max_outstanding_elements # After these many elements are sent, block until the previous batch is processed
max_outstanding_bytes # After these many bytes are sent, block until the previous batch is processed.

what do you think?

if (
self.total_mutation_count >= MAX_MUTATIONS
or self.total_size >= self.max_row_bytes
or self._queue.full()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to check if the queue is full? In the batcher code, we flush when the Batcher is full:

if self._rows.full():
    self.flush_async()

I'm not sure if the behavior is still correct?

I think full() should only check for 2 things:

  • Number of elements reached the batch element threshold
  • Number of bytes reached the batch bytes threshold

The queue is only be used to block user from adding more elements when it's full. We don't trigger another flush when the queue is full.

google/cloud/bigtable/batcher.py Show resolved Hide resolved
google/cloud/bigtable/batcher.py Outdated Show resolved Hide resolved
google/cloud/bigtable/batcher.py Outdated Show resolved Hide resolved
google/cloud/bigtable/batcher.py Show resolved Hide resolved
mutations_count = 0
mutations_size = 0
rows_count = 0
batch_info = BatchInfo()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused by BatchInfo. Doesn't it duplicate the row_count, mutations_count and mutations_size variables?

Copy link
Contributor Author

@Mariatta Mariatta Mar 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's similar info, but for different "buckets".

There are two "queues":

self._rows: for storing the rows we want to mutate
batch_info: for storing info about the rows that are being mutated, and we're waiting for the result/response from backend. This gets passed to the the batch_completed_callback. That's where we can release the flow control.

self.flow_control.release(processed_rows)
del self.futures_mapping[future]

def flush_rows(self, rows_to_flush=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having both flush() and flush_rows() seems a little confusing. Especially since this doesn't seem to be "flushing" from the cache in any way. I could see people calling this with no arguments thinking it is the main flush function

Maybe this should be called mutate_rows? Or just made into an internal helper function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree this is confusing. There is already a public mutate_rows from before, and it has different behavior than this one. I'm changing it to private since user isn't expected to call it manually.

google/cloud/bigtable/batcher.py Show resolved Hide resolved
google/cloud/bigtable/batcher.py Outdated Show resolved Hide resolved
google/cloud/bigtable/batcher.py Outdated Show resolved Hide resolved
google/cloud/bigtable/batcher.py Outdated Show resolved Hide resolved

MAX_ROW_BYTES = 20 * 1024 * 1024 # 20MB # after this many bytes, send out the batch

MAX_MUTATIONS_SIZE = 100 * 1024 * 1024 # 100MB # max inflight byte size.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename this variable to indicate it's for flow control.

Suggested change
MAX_MUTATIONS_SIZE = 100 * 1024 * 1024 # 100MB # max inflight byte size.
MAX_OUTSTANDING_BYTES = 100 * 1024 * 1024 # 100MB # max inflight byte size.


class FlowControl(object):
def __init__(self, max_mutations=MAX_MUTATIONS_SIZE, max_row_bytes=MAX_ROW_BYTES):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The defaults should be:

Suggested change
def __init__(self, max_mutations=MAX_MUTATIONS_SIZE, max_row_bytes=MAX_ROW_BYTES):
def __init__(self, max_mutations=MAX_OUTSTANDING_ELEMNTS, max_row_bytes=MAX_MUTATION_SIZE):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And these numbers are:
MAX_OUTSTANDING_ELEMENTS = 100000
MAX_MUTATION_SIZE = 20 MB
correct?


self.inflight_mutations += batch_info.mutations_count
self.inflight_size += batch_info.mutations_size
self.inflight_rows_count += batch_info.rows_count
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we care about this, it's also not used in is_blocked

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was using this for debugging. I will remove it in the end before merging.

google/cloud/bigtable/batcher.py Show resolved Hide resolved
google/cloud/bigtable/batcher.py Show resolved Hide resolved
@Mariatta
Copy link
Contributor Author

I've made adjustments based on previews reviews and feedback. Please take another look.

@Mariatta Mariatta added the kokoro:force-run Add this label to force Kokoro to re-run the tests. label Mar 29, 2023
@yoshi-kokoro yoshi-kokoro removed the kokoro:force-run Add this label to force Kokoro to re-run the tests. label Mar 29, 2023
Copy link
Contributor

@mutianf mutianf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with some nits!

google/cloud/bigtable/batcher.py Outdated Show resolved Hide resolved
google/cloud/bigtable/batcher.py Outdated Show resolved Hide resolved
google/cloud/bigtable/batcher.py Outdated Show resolved Hide resolved
@Mariatta Mariatta mentioned this pull request Apr 5, 2023
4 tasks
@Mariatta Mariatta requested a review from a team as a code owner April 5, 2023 22:44
Mariatta and others added 8 commits April 5, 2023 16:07
Co-authored-by: Mattie Fu <mattiefu@google.com>
- Remove unneeded error
- Make some functions internal
Co-authored-by: Mattie Fu <mattiefu@google.com>
Co-authored-by: Mattie Fu <mattiefu@google.com>
- Remove debugging variable
- Update variable names
@Mariatta Mariatta added the snippet-bot:force-run Force snippet-bot runs its logic label Apr 6, 2023
@snippet-bot snippet-bot bot removed the snippet-bot:force-run Force snippet-bot runs its logic label Apr 6, 2023
@Mariatta Mariatta added the kokoro:force-run Add this label to force Kokoro to re-run the tests. label Apr 6, 2023
@yoshi-kokoro yoshi-kokoro removed the kokoro:force-run Add this label to force Kokoro to re-run the tests. label Apr 6, 2023
google/cloud/bigtable/batcher.py Show resolved Hide resolved
mutations_size: int = 0


class FlowControl(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be prefixed with an underscore since this is an internal class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will add that in the next commit.

Comment on lines 131 to 134
self.inflight_mutations += batch_info.mutations_count
self.inflight_size += batch_info.mutations_size
self.set_flow_control_status()
self.wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't this cause a deadlock with a large row? if max_inflight_bytes is 2 and the row size is 4, this will just get stuck?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I adjusted the logic, moving the wait to the flush_async function. If the batch is causing the event to be blocked, then it will be sent through, but the subsequent thread will be blocked and waited.

google/cloud/bigtable/batcher.py Show resolved Hide resolved
Copy link
Contributor

@igorbernstein2 igorbernstein2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but please have Daniel take a look at the api and confirm it works for him in the async client

@daniel-sanche
Copy link
Contributor

Some of the API will need to change for the async client, due to different model classes and asyncio patterns, but the general shape of the solution should be mostly consistent. LGTM

@Mariatta Mariatta merged commit 7521a61 into main May 10, 2023
16 checks passed
@Mariatta Mariatta deleted the batcher-threaded branch May 10, 2023 22:42
mutianf added a commit that referenced this pull request May 11, 2023
gcf-merge-on-green bot pushed a commit that referenced this pull request May 11, 2023
Reverts #722

This PR caused beam bigtableio.py failures https://togithub.com/apache/beam/issues/26673 and is blocking beam release. We're unclear why it caused the failure. So will revert this change, cut another release so we can unblock beam and investigate separately.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigtable Issues related to the googleapis/python-bigtable API. size: l Pull request size is large.
5 participants