-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#3821] Fix FluxBuffer
to request 1 when buffer is not modified
#3822
base: main
Are you sure you want to change the base?
Conversation
Hey, @Sage-Pierce ! I apologize it took some time to respond. I reworked the bufferTimeout operator with fair backpressure recently, that's the reason for it. Considering all existing buffer* implementations I believe their behaviour should be consistent across the offering. I wonder how this requirement would influence other implementations as it feels a bit risky in the face of concurrency. Can you share your thoughts? After looking at the codebase it seems the authors didn't consider this scenario and assumed there's no filtering happening and once an item is added, the collection's size increases. I wonder whether including this check and handling is the way to go or perhaps limiting the possible Let me know, thanks! |
Hi @chemicL 👋 No worries about the delay 😄 I didn't immediately look into the other
I don't think that there's any more risk with this change regarding concurrency, since
I don't think that there is currently another way to implement "give me |
I came up with this:
Just as a conversation starter :) I do imagine this doesn't look as nice and the performance would be incomparable. I'll try to digest the rest of the comments and review the other implementations next. For now, can you also prepare a few sample {input, output} sets so that we know what the end goal is? I mean a sequence 1, 2, 1, 3 would yield [1, 2] and [1, 3] for n == 2, but would yield [1, 2, 3] for n == 3. Is that desired? Can you share some real world scenarios that come to mind that this would benefit? I tend to first try to understand the need and then try to work towards a solution that matches the expectations. This potential mismatch regarding expected supplied aggregator types is puzzling and it would be neat if we could comprehensively address this. |
Ah nice, that was abstractly what I had in my head, but I couldn't come up with that
The test I wrote for this changeset covers the basic expectation I think, and it looks like you already understand my intent quite well. I'll just format those and a few more below: Given
In my use case, I am iterating over time-bucketed data elements (from a database) and executing an I/O-bound process on them (a service call). That service call is maximally efficient when passed |
Thanks. For Replace
with
and you can observe the same outcome. Out of the others, |
For the Given
Due to that significant difference in how Given that there isn't actually a "hanging" issue with Thoughts? |
Thanks for following up. I agree that
In my view, the current behaviour when presented with a I think in order to merge something we'd need to cover all As this currently doesn't work correctly nor consistently we should make an effort to bring more clarity in the docs and tests. For
For
I understand this requires more work so please let me know if you're still keen to contribute. I'd just like us to have a consistent UX across similar operators and that requires a holistic approach. I'll be away for a week but if you make any progress, please do commit and I'll review the changes when I'm back. Thanks again @Sage-Pierce and I look forward to where this discussion leads us :) |
@chemicL I don't mind taking a stab at all of that 😄 May take some time, but may have an updated review next week. |
d36d94b
to
4a47fa9
Compare
- Make `Collection` behavior consistent among all FluxBuffer* operators - Added several more tests for all FluxBuffer* operators covering usage of `Set`
…ators that take a `bufferSupplier`
@chemicL I believe I have addressed your feedback, and I look forward to your re-review when you return. I will be on vacation for the first half of July, so it may take me a bit to follow up on further feedback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the follow-up. Please have a look at my comments. For the skip != maxSize
case I have different expectations for the results in the test cases after reading the javadoc. These should:
- a) Either comply with the existing specification (new buffer after
skip
items were emitted by the source). - b) Or adjust the spec to the special case.
With a) it would be necessary to emit the current buffers despite their size being smaller than max but according to considered emitted items so far that fall into the window observed by a particular buffer... And maxSize == skip
should not be a special case. So this would lead to an inefficiency in just emitting smaller buffers and that's not your goal.
I think we're left with b) and I guess that in such a case the overlapping buffers would need to be smaller in size once they fall out of scope. For the disjoint case (skip >= maxSize
) we can just pretend that the discarded items were never emitted.
Additionally, please note the "Discard support" section in the relevant Javadocs -> they should also explain what's happening here.
Let me know your thoughts and thanks for the effort so far.
@@ -3123,6 +3129,10 @@ public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime) { | |||
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum | |||
* size OR the maxTime {@link Duration} elapses. | |||
* <p> | |||
* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation | |||
* of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is | |||
* less than the specified max size. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* less than the specified max size. | |
* less than the specified max size. The element will be discarded in such a case. |
@@ -3163,6 +3173,10 @@ public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime, Schedule | |||
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum | |||
* size OR the maxTime {@link Duration} elapses, as measured on the provided {@link Scheduler}. | |||
* <p> | |||
* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation | |||
* of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is | |||
* less than the specified max size. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same suggestion as above.
@@ -3230,6 +3244,10 @@ public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime, | |||
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum | |||
* size OR the maxTime {@link Duration} elapses. | |||
* <p> | |||
* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation | |||
* of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is | |||
* less than the specified max size. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same suggestion as above.
@@ -3254,6 +3272,10 @@ public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSiz | |||
* will be emitted by the returned {@link Flux} each time the buffer reaches a maximum | |||
* size OR the maxTime {@link Duration} elapses, as measured on the provided {@link Scheduler}. | |||
* <p> | |||
* Note that if buffers provided by bufferSupplier may return {@literal false} upon invocation | |||
* of {@link Collection#add(Object)}, buffer emission may be triggered when the buffer size is | |||
* less than the specified max size. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And the same suggestion as above.
if (!b.add(t)) { | ||
Operators.onDiscard(t, this.ctx); | ||
s.request(1); | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return here doesn't comply with the promise from the Javadoc:
Buffers can be created with gaps, as a new buffer will be created every time {@code skip} values have been emitted by the source
In this case what would happen is that the item that's not considered for the buffer would not be accounted in the skip
tracking, which doesn't refer to the buffer size, but to the emitted items (line 324 would need to be executed in any case). Being compliant would lead to a few more problems, because now we have a non-emitted buffer but the next step could create a new buffer... A few things to consider, suddenly this case would need to handle overlapping buffers, etc.
I suggest documenting this nuance for Collection
s that can return false
upon addition, that if items get discarded, the accounting is not applied. I think that would be ok, as currently the behaviour is already broken for this specific case (stalls happen due to no further requests taking place).
// It should never be the case that an element can be added to the first open | ||
// buffer and not all of them. Otherwise, the buffer behavior is non-deterministic, | ||
// and this operator's behavior is undefined. | ||
if (!b.add(t) && b == b0) { | ||
Operators.onDiscard(t, actual.currentContext()); | ||
s.request(1); | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I imagine it can be the case. In a recent buffer the item is not a duplicate, but in the older buffers it can appear to be a duplicate. Therefore, an item can be discarded only provided that it is a duplicate in all currently tracked buffers. That also necessitates that the old buffers get emitted despite being smaller in size, otherwise there is a risk of leaking memory. Consider the following:
maxSize = 3
skip = 1
input: 1|2|1|2|1|2
step1: 1
buf1 = 1 // ok
step2: 2
buf1 = 1|2 // ok
buf2 = 2 // ok
step3: 1
buf1 = 1|2 // fail(1)
buf2 = 2|1 // ok
buf3 = 1 // ok
step4: 2
buf1 = 1|2 // fail(2)
buf2 = 2|1 // fail(2)
buf3 = 1|2 // ok
buf4 = 2 // ok
and so on.
Ah and regarding the build failure -> make sure to run |
After giving it some though, I wonder whether it makes sense to introduce a new operator for this particular purpose, like
which would emit the buffer once the condition is met and then start a new buffer. Usage:
And for the existing nuance with Collections that can return false, just document that it's not supported for buffer operators. WDYT? |
The more I think about it the more doubts I have :) I consulted RxJava's codebase, which shares the implementation of the To add to that, the suggestion I made with using Consider the following idea for your use case (with artificial delays introduced):
|
I feel I've re-invented the |
If a Set is used as the destination in
Flux.buffer
, the stream will not hang if/when there are duplicates in a given bufferPreviously,
FluxBuffer
was not taking the result of adding to the buffer into account. If adding to the buffer does not result in modifying it, an extrarequest(1)
should be issued.Fixes #3821