Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine Interoperability: first iteration #776

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ PlaygroundUtility.remap
# SwiftPM
.build
Packages
.swiftpm

# Carthage
Carthage/Build
Expand Down
38 changes: 38 additions & 0 deletions ReactiveSwift.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@
9A1D067D1D948A2300ACF44C /* UnidirectionalBindingSpec.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A1D067C1D948A2200ACF44C /* UnidirectionalBindingSpec.swift */; };
9A1D067E1D948A2300ACF44C /* UnidirectionalBindingSpec.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A1D067C1D948A2200ACF44C /* UnidirectionalBindingSpec.swift */; };
9A1D067F1D948A2300ACF44C /* UnidirectionalBindingSpec.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A1D067C1D948A2200ACF44C /* UnidirectionalBindingSpec.swift */; };
9A5865DF244CEF9800AADB58 /* ToCombine.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865DE244CEF9800AADB58 /* ToCombine.swift */; };
9A5865E0244CEF9800AADB58 /* ToCombine.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865DE244CEF9800AADB58 /* ToCombine.swift */; };
9A5865E1244CEF9800AADB58 /* ToCombine.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865DE244CEF9800AADB58 /* ToCombine.swift */; };
9A5865E2244CEF9800AADB58 /* ToCombine.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865DE244CEF9800AADB58 /* ToCombine.swift */; };
9A5865E5244CFD4900AADB58 /* FromCombine.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865E4244CFD4900AADB58 /* FromCombine.swift */; };
9A5865E6244CFD4900AADB58 /* FromCombine.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865E4244CFD4900AADB58 /* FromCombine.swift */; };
9A5865E7244CFD4900AADB58 /* FromCombine.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865E4244CFD4900AADB58 /* FromCombine.swift */; };
9A5865E8244CFD4900AADB58 /* FromCombine.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865E4244CFD4900AADB58 /* FromCombine.swift */; };
9A5865EA244CFE9000AADB58 /* Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865E9244CFE9000AADB58 /* Utilities.swift */; };
9A5865EB244CFE9000AADB58 /* Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865E9244CFE9000AADB58 /* Utilities.swift */; };
9A5865EC244CFE9000AADB58 /* Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865E9244CFE9000AADB58 /* Utilities.swift */; };
9A5865ED244CFE9000AADB58 /* Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865E9244CFE9000AADB58 /* Utilities.swift */; };
9A67963B1F6056B90058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; };
9A67963C1F6059420058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; };
9A67963D1F6059430058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; };
Expand Down Expand Up @@ -240,6 +252,9 @@
9A1A4F981E16961C006F3039 /* ValidatingPropertySpec.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ValidatingPropertySpec.swift; sourceTree = "<group>"; };
9A1B824020835EEC00EB7C09 /* ResultExtensions.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ResultExtensions.swift; sourceTree = "<group>"; };
9A1D067C1D948A2200ACF44C /* UnidirectionalBindingSpec.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = UnidirectionalBindingSpec.swift; sourceTree = "<group>"; };
9A5865DE244CEF9800AADB58 /* ToCombine.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ToCombine.swift; sourceTree = "<group>"; };
9A5865E4244CFD4900AADB58 /* FromCombine.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FromCombine.swift; sourceTree = "<group>"; };
9A5865E9244CFE9000AADB58 /* Utilities.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Utilities.swift; sourceTree = "<group>"; };
9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = UninhabitedTypeGuards.swift; sourceTree = "<group>"; };
9A681A9D1E5A241B00B097CF /* DeprecationSpec.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = DeprecationSpec.swift; sourceTree = "<group>"; };
9A9100DE1E0E6E620093E346 /* ValidatingProperty.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ValidatingProperty.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -380,6 +395,16 @@
path = tvOS;
sourceTree = "<group>";
};
9A5865E3244CFD2A00AADB58 /* CombineInteroperability */ = {
isa = PBXGroup;
children = (
9A5865DE244CEF9800AADB58 /* ToCombine.swift */,
9A5865E4244CFD4900AADB58 /* FromCombine.swift */,
9A5865E9244CFE9000AADB58 /* Utilities.swift */,
);
path = CombineInteroperability;
sourceTree = "<group>";
};
A97451321B3A935E00F48E55 /* watchOS */ = {
isa = PBXGroup;
children = (
Expand Down Expand Up @@ -454,6 +479,7 @@
9A090C131DA0309E00EE97CA /* Reactive.swift */,
D0C312C819EF2A5800984962 /* Scheduler.swift */,
C79B647B1CD52E23003F2376 /* EventLogger.swift */,
9A5865E3244CFD2A00AADB58 /* CombineInteroperability */,
D03B4A3919F4C25F009E02AC /* Signals */,
D03B4A3B19F4C281009E02AC /* Extensions */,
9ABCB1841D2A5B5A00BCA243 /* Deprecations+Removals.swift */,
Expand Down Expand Up @@ -869,6 +895,8 @@
57A4D1B61BA13D7A00F7D4B1 /* Event.swift in Sources */,
57A4D1B81BA13D7A00F7D4B1 /* Scheduler.swift in Sources */,
9A9100E21E0E6E680093E346 /* ValidatingProperty.swift in Sources */,
9A5865E2244CEF9800AADB58 /* ToCombine.swift in Sources */,
9A5865ED244CFE9000AADB58 /* Utilities.swift in Sources */,
57A4D1B91BA13D7A00F7D4B1 /* Action.swift in Sources */,
57A4D1BA1BA13D7A00F7D4B1 /* Property.swift in Sources */,
9A090C171DA0309E00EE97CA /* Reactive.swift in Sources */,
Expand All @@ -881,6 +909,7 @@
57A4D1C01BA13D7A00F7D4B1 /* FoundationExtensions.swift in Sources */,
D85C652D1C0E70E5005A77AD /* Flatten.swift in Sources */,
9ABCB1881D2A5B5A00BCA243 /* Deprecations+Removals.swift in Sources */,
9A5865E8244CFD4900AADB58 /* FromCombine.swift in Sources */,
EBCC7DBF1BBF01E200A2AE92 /* Observer.swift in Sources */,
C79B64801CD52E4E003F2376 /* EventLogger.swift in Sources */,
4A0E11021D2A92720065D310 /* Lifetime.swift in Sources */,
Expand Down Expand Up @@ -924,6 +953,8 @@
A9B315BE1B3940810001CB9C /* Event.swift in Sources */,
A9B315C01B3940810001CB9C /* Scheduler.swift in Sources */,
9A9100E11E0E6E680093E346 /* ValidatingProperty.swift in Sources */,
9A5865E1244CEF9800AADB58 /* ToCombine.swift in Sources */,
9A5865EC244CFE9000AADB58 /* Utilities.swift in Sources */,
A9B315C11B3940810001CB9C /* Action.swift in Sources */,
A9B315C21B3940810001CB9C /* Property.swift in Sources */,
9A090C161DA0309E00EE97CA /* Reactive.swift in Sources */,
Expand All @@ -936,6 +967,7 @@
A9B315C81B3940810001CB9C /* FoundationExtensions.swift in Sources */,
D85C652C1C0E70E4005A77AD /* Flatten.swift in Sources */,
9ABCB1871D2A5B5A00BCA243 /* Deprecations+Removals.swift in Sources */,
9A5865E7244CFD4900AADB58 /* FromCombine.swift in Sources */,
EBCC7DBE1BBF01E200A2AE92 /* Observer.swift in Sources */,
C79B647F1CD52E4D003F2376 /* EventLogger.swift in Sources */,
4A0E11011D2A92720065D310 /* Lifetime.swift in Sources */,
Expand All @@ -952,6 +984,8 @@
D0C312D319EF2A5800984962 /* Disposable.swift in Sources */,
9A9100DF1E0E6E620093E346 /* ValidatingProperty.swift in Sources */,
EBCC7DBC1BBF010C00A2AE92 /* Observer.swift in Sources */,
9A5865DF244CEF9800AADB58 /* ToCombine.swift in Sources */,
9A5865EA244CFE9000AADB58 /* Utilities.swift in Sources */,
D03B4A3D19F4C39A009E02AC /* FoundationExtensions.swift in Sources */,
9A090C141DA0309E00EE97CA /* Reactive.swift in Sources */,
D08C54B31A69A2AE00AD8286 /* Signal.swift in Sources */,
Expand All @@ -964,6 +998,7 @@
D0C312CD19EF2A5800984962 /* Atomic.swift in Sources */,
D08C54BA1A69C54300AD8286 /* Property.swift in Sources */,
D0D11AB91A6AE87700C1F8B1 /* Action.swift in Sources */,
9A5865E5244CFD4900AADB58 /* FromCombine.swift in Sources */,
C79B647C1CD52E23003F2376 /* EventLogger.swift in Sources */,
9ABCB1851D2A5B5A00BCA243 /* Deprecations+Removals.swift in Sources */,
D08C54B81A69A9D000AD8286 /* SignalProducer.swift in Sources */,
Expand Down Expand Up @@ -1007,6 +1042,8 @@
D0C312D419EF2A5800984962 /* Disposable.swift in Sources */,
D08C54B91A69A9D100AD8286 /* SignalProducer.swift in Sources */,
9A9100E01E0E6E670093E346 /* ValidatingProperty.swift in Sources */,
9A5865E0244CEF9800AADB58 /* ToCombine.swift in Sources */,
9A5865EB244CFE9000AADB58 /* Utilities.swift in Sources */,
9ABCB1861D2A5B5A00BCA243 /* Deprecations+Removals.swift in Sources */,
EBCC7DBD1BBF01E100A2AE92 /* Observer.swift in Sources */,
9A090C151DA0309E00EE97CA /* Reactive.swift in Sources */,
Expand All @@ -1019,6 +1056,7 @@
D08C54B71A69A3DB00AD8286 /* Event.swift in Sources */,
C79B647D1CD52E4A003F2376 /* EventLogger.swift in Sources */,
D0C312CE19EF2A5800984962 /* Atomic.swift in Sources */,
9A5865E6244CFD4900AADB58 /* FromCombine.swift in Sources */,
D0C312E819EF2A5800984962 /* Scheduler.swift in Sources */,
D0C312D019EF2A5800984962 /* Bag.swift in Sources */,
D0D11ABA1A6AE87700C1F8B1 /* Action.swift in Sources */,
Expand Down
22 changes: 22 additions & 0 deletions Sources/CombineInteroperability/FromCombine.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#if canImport(Combine)
import Combine

@available(macOS 10.15, iOS 13.0, tvOS 13.0, macCatalyst 13.0, watchOS 6.0, *)
extension Publisher {
public func producer() -> SignalProducer<Output, Failure> {
return SignalProducer { observer, lifetime in
lifetime += self.sink(
receiveCompletion: { completion in
switch completion {
case let .failure(error):
observer.send(error: error)
case .finished:
observer.sendCompleted()
}
},
receiveValue: observer.send(value:)
)
}
}
}
#endif
179 changes: 179 additions & 0 deletions Sources/CombineInteroperability/ToCombine.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
#if canImport(Combine)
import Combine

extension SignalProducerConvertible {
@available(macOS 10.15, iOS 13.0, tvOS 13.0, macCatalyst 13.0, watchOS 6.0, *)
public func eraseToAnyPublisher() -> AnyPublisher<Value, Error> {
publisher().eraseToAnyPublisher()
}

@available(macOS 10.15, iOS 13.0, tvOS 13.0, macCatalyst 13.0, watchOS 6.0, *)
public func publisher() -> ProducerPublisher<Value, Error> {
ProducerPublisher(base: producer)
}
}

@available(macOS 10.15, iOS 13.0, tvOS 13.0, macCatalyst 13.0, watchOS 6.0, *)
public struct ProducerPublisher<Output, Failure: Swift.Error>: Publisher {
public let base: SignalProducer<Output, Failure>

public init(base: SignalProducer<Output, Failure>) {
self.base = base
}

public func receive<S>(subscriber: S) where S : Subscriber, Output == S.Input, Failure == S.Failure {
let subscription = ProducerSubscription(subscriber: subscriber, base: base)
subscription.bootstrap()
}
}

final class ProducerSubscription<S: Subscriber>: Combine.Subscription {
typealias Output = S.Output

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been renamed to .Input

typealias Failure = S.Failure

let subscriber: S
let base: SignalProducer<Output, Failure>
let state: Atomic<State>

init(subscriber: S, base: SignalProducer<Output, Failure>) {
self.subscriber = subscriber
self.base = base
self.state = Atomic(State())
}

func bootstrap() {
subscriber.receive(subscription: self)
}

func request(_ incoming: Subscribers.Demand) {
let response: DemandResponse = state.modify { state in
guard state.hasCancelled == false else {
return .noAction
}

guard state.hasStarted else {
state.hasStarted = true
state.requested = incoming
return .startUpstream
}

state.requested = state.requested + incoming

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be rewritten state.requested += incoming

let unsatified = state.requested - state.satisfied

if let max = unsatified.max {
let dequeueCount = Swift.min(state.buffer.count, max)
state.satisfied += dequeueCount

defer { state.buffer.removeFirst(dequeueCount) }
return .satisfyDemand(Array(state.buffer.prefix(dequeueCount)))
} else {
defer { state.buffer = [] }
return .satisfyDemand(state.buffer)
}
}

switch response {
case let .satisfyDemand(output):
var demand: Subscribers.Demand = .none

for output in output {
demand += subscriber.receive(output)
}

if demand != .none {
request(demand)
}

case .startUpstream:
let disposable = base.start { [weak self] event in
guard let self = self else { return }

switch event {
case let .value(output):
let (shouldSendImmediately, isDemandUnlimited): (Bool, Bool) = self.state.modify { state in
guard state.hasCancelled == false else { return (false, false) }

let unsatified = state.requested - state.satisfied

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[typo] unsatisfied ☺️


if let count = unsatified.max, count >= 1 {
assert(state.buffer.count == 0)
state.satisfied += 1
return (true, false)
} else if unsatified == .unlimited {
assert(state.buffer.isEmpty)
return (true, true)
} else {
assert(state.requested == state.satisfied)
state.buffer.append(output)
return (false, false)
}
}

if shouldSendImmediately {
let demand = self.subscriber.receive(output)

if isDemandUnlimited == false && demand != .none {
self.request(demand)
}
}

case .completed, .interrupted:
self.cancel()
self.subscriber.receive(completion: .finished)

case let .failed(error):
self.cancel()
self.subscriber.receive(completion: .failure(error))
}
}

let shouldDispose: Bool = state.modify { state in
guard state.hasCancelled == false else { return true }
state.producerSubscription = disposable
return false
}

if shouldDispose {
disposable.dispose()
}

case .noAction:
break
}
}

func cancel() {
let disposable = state.modify { $0.cancel() }
disposable?.dispose()
}

struct State {
var requested: Subscribers.Demand = .none
var satisfied: Subscribers.Demand = .none

var buffer: [Output] = []

var producerSubscription: Disposable?
var hasStarted = false
var hasCancelled = false

init() {
producerSubscription = nil
hasStarted = false
hasCancelled = false
}

mutating func cancel() -> Disposable? {
hasCancelled = true
defer { producerSubscription = nil }
return producerSubscription
}
}

enum DemandResponse {
case startUpstream
case satisfyDemand([Output])
case noAction
}
}
#endif
12 changes: 12 additions & 0 deletions Sources/CombineInteroperability/Utilities.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#if canImport(Combine)
import Combine

extension Lifetime {
@available(macOS 10.15, iOS 13.0, tvOS 13.0, macCatalyst 13.0, watchOS 6.0, *)
@discardableResult
public static func += <C: Cancellable>(lhs: Lifetime, rhs: C?) -> Disposable? {
rhs.flatMap { lhs.observeEnded($0.cancel) }
}
}
#endif