Interface Stream

Actual Stream constructor wrapped the the main exported function

Hierarchy

Index

Methods

Methods

public addListener(event: string, listener: Function): EventEmitter

Parameters

  • event: string
  • listener: Function

Returns

EventEmitter

public append(y: R in Highland.Stream<R>): Stream

Adds a value to the end of a Stream.

id

append

section

Streams

name

Stream.append(y)

api

public

Parameters

  • y: R in Highland.Stream<R>
    • the value to append to the Stream

Returns

Stream

public apply(f: Function)

Applies results from a Stream as arguments to a function

id

apply

section

Streams

name

Stream.apply(f)

api

public

Parameters

  • f: Function
    • the function to apply arguments to

public collect(): Stream

Groups all values into an Array and passes down the stream as a single data event. This is a bit like doing toArray, but instead of accepting a callback and causing a thunk, it passes the value on.

id

collect

section

Streams

name

Stream.collect()

api

public

Returns

Stream

public compact(): Stream

Filters a Stream to drop all non-truthy values.

id

compact

section

Streams

name

Stream.compact()

api

public

Returns

Stream

public concat(ys: Stream): Stream

Concatenates a Stream to the end of this Stream.

Be aware that in the top-level export, the args may be in the reverse order to what you'd expect _([a], [b]) => [b, a], as this follows the convention of other top-level exported functions which do x to y.

id

concat

section

Streams

name

Stream.concat(ys)

params

{Stream | Array} ys - the values to concatenate onto this Stream

api

public

Parameters

Returns

Stream

public concat(ys: Array<R>): Stream

Parameters

  • ys: Array<R>

Returns

Stream

public consume(f: (err: Error, x: R, push: (err: Error, value?: U) => void, next: () => void) => void): Stream

Consumes values from a Stream (once resumed) and returns a new Stream for you to optionally push values onto using the provided push / next functions.

This function forms the basis of many higher-level Stream operations. It will not cause a paused stream to immediately resume, but behaves more like a 'through' stream, handling values as they are read.

id

consume

section

Streams

name

Stream.consume(f)

api

public

Parameters

  • f: (err: Error, x: R, push: (err: Error, value?: U) => void, next: () => void) => void
    • the function to handle errors and values

Returns

Stream

public debounce(ms: number): Stream

Holds off pushing data events downstream until there has been no more data for ms milliseconds. Sends the last value that occurred before the delay, discarding all other values.

id

debounce

section

Streams

name

Stream.debounce(ms)

api

public

Parameters

  • ms: number
    • the milliseconds to wait before sending data

Returns

Stream

public destroy()

Destroys a stream by unlinking it from any consumers and sources. This will stop all consumers from receiving events from this stream and removes this stream as a consumer of any source stream.

This function calls end() on the stream and unlinks it from any piped-to streams.

id

pipe

section

Streams

name

Stream.destroy()

api

public

public each(f: (x: R) => void)

Iterates over every value from the Stream, calling the iterator function on each of them. This function causes a thunk.

If an error from the Stream reaches the each call, it will emit an error event (which will cause it to throw if unhandled).

id

each

section

Streams

name

Stream.each(f)

api

public

Parameters

  • f: (x: R) => void
    • the iterator function

public emit(event: string, args?: Array<any>): boolean

Parameters

  • event: string
  • args?: Array<any> optional

Returns

boolean

public end()

Ends a Stream. This is the same as sending a nil value as data. You shouldn't need to call this directly, rather it will be called by any Node Readable Streams you pipe in.

id

end

section

Streams

name

Stream.end()

api

public

public errors(f: (err: Error, push: (err: Error, x?: R) => void) => void): Stream

Extracts errors from a Stream and applies them to an error handler function. Returns a new Stream with the errors removed (unless the error handler chooses to rethrow them using push). Errors can also be transformed and put back onto the Stream as values.

id

errors

section

Streams

name

Stream.errors(f)

api

public

Parameters

  • f: (err: Error, push: (err: Error, x?: R) => void) => void
    • the function to pass all errors to

Returns

Stream

public filter(f: (x: R) => boolean): Stream

Creates a new Stream including only the values which pass a truth test.

id

filter

section

Streams

name

Stream.filter(f)

api

public

Parameters

  • f: (x: R) => boolean
    • the truth test function

Returns

Stream

public find(f: (x: R) => boolean): Stream

A convenient form of filter, which returns the first object from a Stream that passes the provided truth test

id

find

section

Streams

name

Stream.find(f)

api

public

Parameters

  • f: (x: R) => boolean
    • the truth test function which returns a Stream

Returns

Stream

public flatFilter(f: (x: R) => Highland.Stream<boolean>): Stream

Filters using a predicate which returns a Stream. If you need to check against an asynchronous data source when filtering a Stream, this can be convenient. The Stream returned from the filter function should have a Boolean as it's first value (all other values on the Stream will be disregarded).

id

flatFilter

section

Streams

name

Stream.flatFilter(f)

api

public

Parameters

  • f: (x: R) => Highland.Stream<boolean>
    • the truth test function which returns a Stream

Returns

Stream

public flatMap(f: (x: R) => Highland.Stream<U>): Stream

Creates a new Stream of values by applying each item in a Stream to an iterator function which may return a Stream. Each item on these result Streams are then emitted on a single output Stream.

The same as calling stream.map(f).flatten().

id

flatMap

section

Streams

name

Stream.flatMap(f)

api

public

Parameters

  • f: (x: R) => Highland.Stream<U>
    • the iterator function

Returns

Stream

public flatMap(f: (x: R) => U): Stream

Parameters

  • f: (x: R) => U

Returns

Stream

public flatten(): Stream

Recursively reads values from a Stream which may contain nested Streams or Arrays. As values or errors are encountered, they are emitted on a single output Stream.

id

flatten

section

Streams

name

Stream.flatten()

api

public

Returns

Stream

public flatten(): Stream

Returns

Stream

public fork(): Stream

Forks a stream, allowing you to add additional consumers with shared back-pressure. A stream forked to multiple consumers will only pull values from it's source as fast as the slowest consumer can handle them.

id

fork

section

Streams

name

Stream.fork()

api

public

Returns

Stream

public group(f: (x: R) => string): Stream

A convenient form of reduce, which groups items based on a function or property name

id

group

section

Streams

name

Stream.group(f)

api

public

Parameters

  • f: (x: R) => string
    • the function or property name on which to group,
                             toString() is called on the result of a function.
      

Returns

Stream

public group(prop: string): Stream

Parameters

  • prop: string

Returns

Stream

public head(): Stream

Creates a new Stream with only the first value from the source.

id

head

section

Streams

name

Stream.head()

api

public

_([1, 2, 3, 4]).head() // => 1

Returns

Stream

public invoke(method: string, args: Array<any>): Stream

Calls a named method on each object from the Stream - returning a new stream with the result of those calls.

id

invoke

section

Streams

name

Stream.invoke(method, args)

api

public

Parameters

  • method: string
    • the method name to call
  • args: Array<any>
    • the arguments to call the method with

Returns

Stream

public last(): Stream

Drops all values from the Stream apart from the last one (if any).

id

last

section

Streams

name

Stream.last()

api

public

Returns

Stream

public latest(): Stream

Creates a new Stream, which when read from, only returns the last seen value from the source. The source stream does not experience back-pressure. Useful if you're using a Stream to model a changing property which you need to query periodically.

id

latest

section

Streams

name

Stream.latest()

api

public

Returns

Stream

public listeners(event: string): Array<Function>

Parameters

  • event: string

Returns

Array<Function>

public map(f: (x: R) => U): Stream

Creates a new Stream of transformed values by applying a function to each value from the source. The transformation function can be replaced with a non-function value for convenience, and it will emit that value for every data event on the source Stream.

id

map

section

Streams

name

Stream.map(f)

api

public

Parameters

  • f: (x: R) => U
    • the transformation function or value to map to

Returns

Stream

public merge(ys: Stream): Stream

Takes a Stream of Streams and merges their values and errors into a single new Stream. The merged stream ends when all source streams have ended.

Note that no guarantee is made with respect to the order in which values for each stream end up in the merged stream. Values in the merged stream will, however, respect the order they were emitted from their respective streams.

id

merge

section

Streams

name

Stream.merge()

api

public

var txt = (['foo.txt', 'bar.txt']).map(readFile) var md = (['baz.md']).map(readFile)

_([txt, md]).merge(); // => contents of foo.txt, bar.txt and baz.txt in the order they were read

Parameters

Returns

Stream

public observe(): Stream

Observes a stream, allowing you to handle values as they are emitted, without adding back-pressure or causing data to be pulled from the source. This can be useful when you are performing two related queries on a stream where one would block the other. Just be aware that a slow observer could fill up it's buffer and cause memory issues. Where possible, you should use fork.

id

observe

section

Streams

name

Stream.observe()

api

public

Returns

Stream

public on(event: string, listener: Function): EventEmitter

Parameters

  • event: string
  • listener: Function

Returns

EventEmitter

public once(event: string, listener: Function): EventEmitter

Parameters

  • event: string
  • listener: Function

Returns

EventEmitter

public otherwise(ys: Stream): Stream

Switches source to an alternate Stream if the current Stream is empty.

id

otherwise

section

Streams

name

Stream.otherwise(ys)

api

public

Parameters

  • ys: Stream
    • alternate stream to use if this stream is empty

Returns

Stream

public parallel(n: number): Stream

Takes a Stream of Streams and reads from them in parallel, buffering the results until they can be returned to the consumer in their original order.

id

parallel

section

Streams

name

Stream.parallel(n)

api

public

Parameters

  • n: number
    • the maximum number of concurrent reads/buffers

Returns

Stream

public pause()

Pauses the stream. All Highland Streams start in the paused state.

id

pause

section

Streams

name

Stream.pause()

api

public

public pipe(dest: Stream): Stream

Pipes a Highland Stream to a Node Writable Stream (Highland Streams are also Node Writable Streams). This will pull all the data from the source Highland Stream and write it to the destination, automatically managing flow so that the destination is not overwhelmed by a fast source.

This function returns the destination so you can chain together pipe calls.

id

pipe

section

Streams

name

Stream.pipe(dest)

api

public

Parameters

  • dest: Stream
    • the destination to write all data to

Returns

Stream

public pipe(dest: ReadWriteStream): Stream

Parameters

Returns

Stream

public pipe(dest: WritableStream)

Parameters

public pluck(prop: string): Stream

Retrieves values associated with a given property from all elements in the collection.

id

pluck

section

Streams

name

Stream.pluck(property)

api

public

Parameters

  • prop: string
    • the property to which values should be associated

Returns

Stream

public pull(f: (err: Error, x: R) => void)

Consumes a single item from the Stream. Unlike consume, this function will not provide a new stream for you to push values onto, and it will unsubscribe as soon as it has a single error, value or nil from the source.

You probably won't need to use this directly, but it is used internally by some functions in the Highland library.

id

pull

section

Streams

name

Stream.pull(f)

api

public

Parameters

  • f: (err: Error, x: R) => void
    • the function to handle data

public reduce(memo: U, f: (memo: U, x: R) => U): Stream

Boils down a Stream to a single value. The memo is the initial state of the reduction, and each successive step of it should be returned by the iterator function. The iterator is passed two arguments: the memo and the next value.

id

reduce

section

Streams

name

Stream.reduce(memo, iterator)

api

public

Parameters

  • memo: U
    • the initial state of the reduction
  • f: (memo: U, x: R) => U

Returns

Stream

public reduce1(memo: U, f: (memo: U, x: R) => U): Stream

Same as reduce, but uses the first element as the initial state instead of passing in a memo value.

id

reduce1

section

Streams

name

Stream.reduce1(iterator)

api

public

Parameters

  • memo: U
  • f: (memo: U, x: R) => U

Returns

Stream

public reject(f: (x: R) => boolean): Stream

The inverse of filter.

id

reject

section

Streams

name

Stream.reject(f)

api

public

var odds = _([1, 2, 3, 4]).reject(function (x) { return x % 2 === 0; });

Parameters

  • f: (x: R) => boolean
    • the truth test function

Returns

Stream

public removeAllListeners(event?: string): EventEmitter

Parameters

  • event?: string optional

Returns

EventEmitter

public removeListener(event: string, listener: Function): EventEmitter

Parameters

  • event: string
  • listener: Function

Returns

EventEmitter

public resume()

Resumes a paused Stream. This will either read from the Stream's incoming buffer or request more data from an upstream source.

id

resume

section

Streams

name

Stream.resume()

api

public

public scan(memo: U, x: (memo: U, x: R) => U): Stream

Like reduce, but emits each intermediate value of the reduction as it is calculated.

id

scan

section

Streams

name

Stream.scan(memo, iterator)

api

public

Parameters

  • memo: U
    • the initial state of the reduction
  • x: (memo: U, x: R) => U

Returns

Stream

public scan1(memo: U, x: (memo: U, x: R) => U): Stream

Same as scan, but uses the first element as the initial state instead of passing in a memo value.

id

scan1

section

Streams

name

Stream.scan1(iterator)

api

public

_([1, 2, 3, 4]).scan1(add) // => 1, 3, 6, 10

Parameters

  • memo: U
  • x: (memo: U, x: R) => U

Returns

Stream

public sequence(): Stream

Reads values from a Stream of Streams, emitting them on a Single output Stream. This can be thought of as a flatten, just one level deep. Often used for resolving asynchronous actions such as a HTTP request or reading a file.

id

sequence

section

Streams

name

Stream.sequence()

api

public

Returns

Stream

public series(): Stream

An alias for the sequence method.

id

series

section

Streams

name

Stream.series()

api

public

Returns

Stream

public setMaxListeners(n: number)

Parameters

  • n: number

public stopOnError(f: (err: Error) => void): Stream

Like the errors method, but emits a Stream end marker after an Error is encountered.

id

stopOnError

section

Streams

name

Stream.stopOnError(f)

api

public

Parameters

  • f: (err: Error) => void
    • the function to handle an error

Returns

Stream

public take(n: number): Stream

Creates a new Stream with the first n values from the source.

id

take

section

Streams

name

Stream.take(n)

api

public

Parameters

  • n: number
    • integer representing number of values to read from source

Returns

Stream

public throttle(ms: number): Stream

Ensures that only one data event is push downstream (or into the buffer) every ms milliseconds, any other values are dropped.

id

throttle

section

Streams

name

Stream.throttle(ms)

api

public

Parameters

  • ms: number
    • the minimum milliseconds between each value

Returns

Stream

public toArray(f: (arr: R[]) => void)

Collects all values from a Stream into an Array and calls a function with once with the result. This function causes a thunk.

If an error from the Stream reaches the toArray call, it will emit an error event (which will cause it to throw if unhandled).

id

toArray

section

Streams

name

Stream.toArray(f)

api

public

Parameters

  • f: (arr: R[]) => void
    • the callback to provide the completed Array to

public where(props: Object): Stream

A convenient form of filter, which returns all objects from a Stream match a set of property values.

id

where

section

Streams

name

Stream.where(props)

api

public

Parameters

  • props: Object
    • the properties to match against

Returns

Stream

public write(x: R in Highland.Stream<R>): boolean

Writes a value to the Stream. If the Stream is paused it will go into the Stream's incoming buffer, otherwise it will be immediately processed and sent to the Stream's consumers (if any). Returns false if the Stream is paused, true otherwise. This lets Node's pipe method handle back-pressure.

You shouldn't need to call this yourself, but it may be called by Node functions which treat Highland Streams as a Node Writable Stream.

id

write

section

Streams

name

Stream.write(x)

api

public

Parameters

  • x: R in Highland.Stream<R>
    • the value to write to the Stream

Returns

boolean

public zip(ys: Array<R>): Stream

Takes two Streams and returns a Stream of corresponding pairs.

id

zip

section

Streams

name

Stream.zip(ys)

api

public

Parameters

  • ys: Array<R>
    • the other stream to combine values with

Returns

Stream

public zip(ys: Stream): Stream

Parameters

Returns

Stream