Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine Interoperability: first iteration #776

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Next Next commit
Convert any Signal, Producer or Property to a Combine publisher via `…
…publisher()`.
  • Loading branch information
andersio committed Apr 19, 2020
commit d718b1706a00df0ea7333c9258255129bbb2a3aa
10 changes: 10 additions & 0 deletions ReactiveSwift.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@
9A1D067D1D948A2300ACF44C /* UnidirectionalBindingSpec.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A1D067C1D948A2200ACF44C /* UnidirectionalBindingSpec.swift */; };
9A1D067E1D948A2300ACF44C /* UnidirectionalBindingSpec.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A1D067C1D948A2200ACF44C /* UnidirectionalBindingSpec.swift */; };
9A1D067F1D948A2300ACF44C /* UnidirectionalBindingSpec.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A1D067C1D948A2200ACF44C /* UnidirectionalBindingSpec.swift */; };
9A5865DF244CEF9800AADB58 /* CombineInteroperability.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865DE244CEF9800AADB58 /* CombineInteroperability.swift */; };
9A5865E0244CEF9800AADB58 /* CombineInteroperability.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865DE244CEF9800AADB58 /* CombineInteroperability.swift */; };
9A5865E1244CEF9800AADB58 /* CombineInteroperability.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865DE244CEF9800AADB58 /* CombineInteroperability.swift */; };
9A5865E2244CEF9800AADB58 /* CombineInteroperability.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865DE244CEF9800AADB58 /* CombineInteroperability.swift */; };
9A67963B1F6056B90058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; };
9A67963C1F6059420058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; };
9A67963D1F6059430058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; };
Expand Down Expand Up @@ -240,6 +244,7 @@
9A1A4F981E16961C006F3039 /* ValidatingPropertySpec.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ValidatingPropertySpec.swift; sourceTree = "<group>"; };
9A1B824020835EEC00EB7C09 /* ResultExtensions.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ResultExtensions.swift; sourceTree = "<group>"; };
9A1D067C1D948A2200ACF44C /* UnidirectionalBindingSpec.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = UnidirectionalBindingSpec.swift; sourceTree = "<group>"; };
9A5865DE244CEF9800AADB58 /* CombineInteroperability.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CombineInteroperability.swift; sourceTree = "<group>"; };
9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = UninhabitedTypeGuards.swift; sourceTree = "<group>"; };
9A681A9D1E5A241B00B097CF /* DeprecationSpec.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = DeprecationSpec.swift; sourceTree = "<group>"; };
9A9100DE1E0E6E620093E346 /* ValidatingProperty.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ValidatingProperty.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -454,6 +459,7 @@
9A090C131DA0309E00EE97CA /* Reactive.swift */,
D0C312C819EF2A5800984962 /* Scheduler.swift */,
C79B647B1CD52E23003F2376 /* EventLogger.swift */,
9A5865DE244CEF9800AADB58 /* CombineInteroperability.swift */,
D03B4A3919F4C25F009E02AC /* Signals */,
D03B4A3B19F4C281009E02AC /* Extensions */,
9ABCB1841D2A5B5A00BCA243 /* Deprecations+Removals.swift */,
Expand Down Expand Up @@ -869,6 +875,7 @@
57A4D1B61BA13D7A00F7D4B1 /* Event.swift in Sources */,
57A4D1B81BA13D7A00F7D4B1 /* Scheduler.swift in Sources */,
9A9100E21E0E6E680093E346 /* ValidatingProperty.swift in Sources */,
9A5865E2244CEF9800AADB58 /* CombineInteroperability.swift in Sources */,
57A4D1B91BA13D7A00F7D4B1 /* Action.swift in Sources */,
57A4D1BA1BA13D7A00F7D4B1 /* Property.swift in Sources */,
9A090C171DA0309E00EE97CA /* Reactive.swift in Sources */,
Expand Down Expand Up @@ -924,6 +931,7 @@
A9B315BE1B3940810001CB9C /* Event.swift in Sources */,
A9B315C01B3940810001CB9C /* Scheduler.swift in Sources */,
9A9100E11E0E6E680093E346 /* ValidatingProperty.swift in Sources */,
9A5865E1244CEF9800AADB58 /* CombineInteroperability.swift in Sources */,
A9B315C11B3940810001CB9C /* Action.swift in Sources */,
A9B315C21B3940810001CB9C /* Property.swift in Sources */,
9A090C161DA0309E00EE97CA /* Reactive.swift in Sources */,
Expand Down Expand Up @@ -952,6 +960,7 @@
D0C312D319EF2A5800984962 /* Disposable.swift in Sources */,
9A9100DF1E0E6E620093E346 /* ValidatingProperty.swift in Sources */,
EBCC7DBC1BBF010C00A2AE92 /* Observer.swift in Sources */,
9A5865DF244CEF9800AADB58 /* CombineInteroperability.swift in Sources */,
D03B4A3D19F4C39A009E02AC /* FoundationExtensions.swift in Sources */,
9A090C141DA0309E00EE97CA /* Reactive.swift in Sources */,
D08C54B31A69A2AE00AD8286 /* Signal.swift in Sources */,
Expand Down Expand Up @@ -1007,6 +1016,7 @@
D0C312D419EF2A5800984962 /* Disposable.swift in Sources */,
D08C54B91A69A9D100AD8286 /* SignalProducer.swift in Sources */,
9A9100E01E0E6E670093E346 /* ValidatingProperty.swift in Sources */,
9A5865E0244CEF9800AADB58 /* CombineInteroperability.swift in Sources */,
9ABCB1861D2A5B5A00BCA243 /* Deprecations+Removals.swift in Sources */,
EBCC7DBD1BBF01E100A2AE92 /* Observer.swift in Sources */,
9A090C151DA0309E00EE97CA /* Reactive.swift in Sources */,
Expand Down
179 changes: 179 additions & 0 deletions Sources/CombineInteroperability.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
#if canImport(Combine)
import Combine

extension SignalProducerConvertible {
@available(macOS 10.15, *)
@available(iOS 13.0, *)
@available(tvOS 13.0, *)
@available(macCatalyst 13.0, *)
@available(watchOS 6.0, *)
public func publisher() -> ProducerPublisher<Value, Error> {
ProducerPublisher(base: producer)
}
}

@available(macOS 10.15, *)
@available(iOS 13.0, *)
@available(tvOS 13.0, *)
@available(macCatalyst 13.0, *)
@available(watchOS 6.0, *)
public struct ProducerPublisher<Output, Failure: Swift.Error>: Publisher {
public let base: SignalProducer<Output, Failure>

public init(base: SignalProducer<Output, Failure>) {
self.base = base
}

public func receive<S>(subscriber: S) where S : Subscriber, Output == S.Input, Failure == S.Failure {
let subscription = Subscription(subscriber: subscriber, base: base)
subscription.bootstrap()
}

final class Subscription<S: Subscriber>: Combine.Subscription where Output == S.Input, Failure == S.Failure {
let subscriber: S
let base: SignalProducer<Output, Failure>
let state: Atomic<State>

init(subscriber: S, base: SignalProducer<Output, Failure>) {
self.subscriber = subscriber
self.base = base
self.state = Atomic(State())
}

func bootstrap() {
subscriber.receive(subscription: self)
}

func request(_ incoming: Subscribers.Demand) {
let response: DemandResponse = state.modify { state in
guard state.hasCancelled == false else {
return .noAction
}

guard state.hasStarted else {
state.hasStarted = true
state.requested = incoming
return .startUpstream
}

state.requested = state.requested + incoming
let unsatified = state.requested - state.satisfied

if let max = unsatified.max {
let dequeueCount = Swift.min(state.buffer.count, max)
state.satisfied += dequeueCount

defer { state.buffer.removeFirst(dequeueCount) }
return .satisfyDemand(Array(state.buffer.prefix(dequeueCount)))
} else {
defer { state.buffer = [] }
return .satisfyDemand(state.buffer)
}
}

switch response {
case let .satisfyDemand(output):
var demand: Subscribers.Demand = .none

for output in output {
demand += subscriber.receive(output)
}

if demand != .none {
request(demand)
}

case .startUpstream:
let disposable = base.start { [weak self] event in
guard let self = self else { return }

switch event {
case let .value(output):
let (shouldSendImmediately, isDemandUnlimited): (Bool, Bool) = self.state.modify { state in
guard state.hasCancelled == false else { return (false, false) }

let unsatified = state.requested - state.satisfied

if let count = unsatified.max, count >= 1 {
assert(state.buffer.count == 0)
state.satisfied += 1
return (true, false)
} else if unsatified == .unlimited {
assert(state.buffer.isEmpty)
return (true, true)
} else {
assert(state.requested == state.satisfied)
state.buffer.append(output)
return (false, false)
}
}

if shouldSendImmediately {
let demand = self.subscriber.receive(output)

if isDemandUnlimited == false && demand != .none {
self.request(demand)
}
}

case .completed, .interrupted:
self.cancel()
self.subscriber.receive(completion: .finished)

case let .failed(error):
self.cancel()
self.subscriber.receive(completion: .failure(error))
}
}

let shouldDispose: Bool = state.modify { state in
guard state.hasCancelled == false else { return true }
state.producerSubscription = disposable
return false
}

if shouldDispose {
disposable.dispose()
}

case .noAction:
break
}
}

func cancel() {
let disposable = state.modify { $0.cancel() }
disposable?.dispose()
}

struct State {
var requested: Subscribers.Demand = .none
var satisfied: Subscribers.Demand = .none

var buffer: [Output] = []

var producerSubscription: Disposable?
var hasStarted = false
var hasCancelled = false

init() {
producerSubscription = nil
hasStarted = false
hasCancelled = false
}

mutating func cancel() -> Disposable? {
hasCancelled = true
defer { producerSubscription = nil }
return producerSubscription
}
}

enum DemandResponse {
case startUpstream
case satisfyDemand([Output])
case noAction
}
}
}
#endif