Skip to content

Commit

Permalink
[refactored] IndefiniteSubject to live in material-motion-streams
Browse files Browse the repository at this point in the history
Summary:
As requested by @featherless in material-motion/indefinite-observable-js#14

It may eventually live in its own repo, but for now there's a desire to have the indefinite-observable package have an identical interface across the supported platforms, which means IndefiniteSubject needs a new home.  Until it has one, it lives here.

Reviewers: O3 Material JavaScript platform reviewers, #material_motion, O2 Material Motion, featherless

Reviewed By: #material_motion, O2 Material Motion, featherless

Subscribers: featherless

Tags: #material_motion

Differential Revision: http://codereview.cc/D2490
  • Loading branch information
appsforartists committed Jan 12, 2017
1 parent 514adc7 commit ec93284
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 1 deletion.
2 changes: 1 addition & 1 deletion packages/streams/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"test": "node ../../tools/test.js --only streams"
},
"dependencies": {
"indefinite-observable": "0.3.0",
"indefinite-observable": "1.0.0",
"tslib": "^1.2.0"
},
"devDependencies": {
Expand Down
93 changes: 93 additions & 0 deletions packages/streams/src/IndefiniteSubject.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/** @license
* Copyright 2016 - present The Material Motion Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy
* of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

import $$observable from 'symbol-observable';

import wrapWithObserver from 'indefinite-observable/dist/wrapWithObserver';

import {
Observable,
Observer,
ObserverOrNext,
Subscription,
} from 'indefinite-observable';

/**
* An IndefiniteSubject is both an Observer and an Observable. Whenever it
* receives a value on `next`, it forwards that value to any subscribed
* observers.
*
* IndefiniteSubject is a multicast Observable; it remembers the most recent
* value dispatched and passes it to any new subscriber.
*/
export class IndefiniteSubject<T> implements Observable<T>, Observer<T> {
// Keep track of all the observers who have subscribed, so we can notify them
// when we get new values. Note: JavaScript's Set collection is ordered.
_observers: Set<Observer<T>> = new Set();
_lastValue: T;
_hasStarted: boolean = false;

/**
* Passes the supplied value to any currently-subscribed observers. If an
* observer `subscribe`s before `next` is called again, it will immediately
* receive `value`.
*/
next(value: T) {
this._hasStarted = true;
this._lastValue = value;

// The parent stream has dispatched a value, so pass it along to all the
// children, and cache it for any observers that subscribe before the next
// dispatch.
this._observers.forEach(
(observer: Observer<T>) => observer.next(value)
);
}

/**
* `subscribe` accepts either a function or an object with a next method.
* `subject.next` will forward any value it receives to the function or method
* provided here.
*
* Call the returned `unsubscribe` method to stop receiving values on this
* particular observer.
*/
subscribe(observerOrNext: ObserverOrNext<T>): Subscription {
const observer = wrapWithObserver<T>(observerOrNext);

this._observers.add(observer);

if (this._hasStarted) {
observer.next(this._lastValue);
}

return {
unsubscribe: () => {
this._observers.delete(observer);
}
};
}

/**
* Tells other libraries that know about observables that we are one.
*
* https://github.com/tc39/proposal-observable#observable
*/
[$$observable](): Observable<T> {
return this;
}
}
export default IndefiniteSubject;
121 changes: 121 additions & 0 deletions packages/streams/src/__tests__/IndefiniteSubject.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/** @license
* Copyright 2016 - present The Material Motion Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy
* of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

import { expect } from 'chai';

import {
beforeEach,
describe,
it,
} from 'mocha-sugar-free';

import {
spy,
} from 'sinon';

import IndefiniteSubject from '../IndefiniteSubject';

declare function require(name: string);

// chai really doesn't like being imported as an ES2015 module; will be fixed in v4
require('chai').use(
require('sinon-chai')
);

describe('IndefiniteSubject',
() => {
let subject;
let listener1;
let listener2;

beforeEach(
() => {
subject = new IndefiniteSubject();

listener1 = spy();
listener2 = spy();
}
);

it(`should not call a subscriber until next has been called`,
() => {
subject.subscribe(listener1);
expect(listener1).not.to.have.been.called;
}
);

it(`should call all subscribers when a new value is dispatched`,
() => {
subject.subscribe(listener1);
subject.subscribe(listener2);

subject.next(2);

expect(listener1).to.have.been.calledWith(2);
expect(listener2).to.have.been.calledWith(2);
}
);

it(`should remember its last value and dispatch it immediately to a new subscriber`,
() => {
subject.next(5);
subject.subscribe(listener1);
expect(listener1).to.have.been.calledWith(5);
}
);

it(`should stop calling subscribers who call unsubscribe`,
() => {
const subscription1 = subject.subscribe(listener1);
const subscription2 = subject.subscribe(listener2);

subject.next(1);

subscription1.unsubscribe();

subject.next(2);

expect(listener1).to.have.been.calledOnce;
expect(listener2).to.have.been.calledTwice;
}
);

it(`should accept an observer or an anonymous function`,
() => {
subject.subscribe({
next: listener1
});

subject.next(7);

expect(listener1).to.have.been.calledWith(7);
}
);

it(`should identify itself as an adherent of the TC39 observable proposal`,
() => {
// According to the TC39 spec, if Symbol is defined, `this` should be
// returned by stream[Symbol.observable](). Otherwise, the key is
// '@@observable'.
const $$observable = typeof Symbol !== 'undefined'
? (Symbol as any).observable
: '@@observable';

expect(subject[$$observable]()).to.equal(subject);
}
);
}
);
3 changes: 3 additions & 0 deletions packages/streams/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
export * from './types';
export * from './properties';

export * from './IndefiniteSubject';
export { default as IndefiniteSubject } from './IndefiniteSubject';

export * from './MotionObservable';
export { default as MotionObservable } from './MotionObservable';

Expand Down

0 comments on commit ec93284

Please sign in to comment.