Appearance
Stream<T> abstract mixin
abstract mixin class Stream<T>Annotations: @vmIsolateUnsendable
A source of asynchronous data events.
A Stream provides a way to receive a sequence of events. Each event is either a data event, also called an element of the stream, or an error event, which is a notification that something has failed. When a stream has emitted all its events, a single "done" event notifies the listener that the end has been reached.
You produce a stream by calling an async* function, which then returns a stream. Consuming that stream will lead the function to emit events until it ends, and the stream closes. You consume a stream either using an await for loop, which is available inside an async or async* function, or by forwarding its events directly using yield* inside an async* function. Example:
dart
Stream<T> optionalMap<T>(
Stream<T> source , [T Function(T)? convert]) async* {
if (convert == null) {
yield* source;
} else {
await for (var event in source) {
yield convert(event);
}
}
}When this function is called, it immediately returns a Stream<T> object. Then nothing further happens until someone tries to consume that stream. At that point, the body of the async* function starts running. If the convert function was omitted, the yield* will listen to the source stream and forward all events, date and errors, to the returned stream. When the source stream closes, the yield* is done, and the optionalMap function body ends too. This closes the returned stream. If a convert is supplied, the function instead listens on the source stream and enters an await for loop which repeatedly waits for the next data event. On a data event, it calls convert with the value and emits the result on the returned stream. If no error events are emitted by the source stream, the loop ends when the source stream does, then the optionalMap function body completes, which closes the returned stream. On an error event from the source stream, the await for re-throws that error, which breaks the loop. The error then reaches the end of the optionalMap function body, since it's not caught. That makes the error be emitted on the returned stream, which then closes.
The Stream class also provides functionality which allows you to manually listen for events from a stream, or to convert a stream into another stream or into a future.
The forEach function corresponds to the await for loop, just as Iterable.forEach corresponds to a normal for/in loop. Like the loop, it will call a function for each data event and break on an error.
The more low-level listen method is what every other method is based on. You call listen on a stream to tell it that you want to receive events, and to register the callbacks which will receive those events. When you call listen, you receive a StreamSubscription object which is the active object providing the events, and which can be used to stop listening again, or to temporarily pause events from the subscription.
There are two kinds of streams: "Single-subscription" streams and "broadcast" streams.
A single-subscription stream allows only a single listener during the whole lifetime of the stream. It doesn't start generating events until it has a listener, and it stops sending events when the listener is unsubscribed, even if the source of events could still provide more. The stream created by an async* function is a single-subscription stream, but each call to the function creates a new such stream.
Listening twice on a single-subscription stream is not allowed, even after the first subscription has been canceled.
Single-subscription streams are generally used for streaming chunks of larger contiguous data, like file I/O.
A broadcast stream allows any number of listeners, and it fires its events when they are ready, whether there are listeners or not.
Broadcast streams are used for independent events/observers.
If several listeners want to listen to a single-subscription stream, use asBroadcastStream to create a broadcast stream on top of the non-broadcast stream.
On either kind of stream, stream transformations, such as where and skip, return the same type of stream as the one the method was called on, unless otherwise noted.
When an event is fired, the listener(s) at that time will receive the event. If a listener is added to a broadcast stream while an event is being fired, that listener will not receive the event currently being fired. If a listener is canceled, it immediately stops receiving events. Listening on a broadcast stream can be treated as listening on a new stream containing only the events that have not yet been emitted when the listen call occurs. For example the first getter listens to the stream, then returns the first event that listener receives. This is not necessarily the first even emitted by the stream, but the first of the remaining events of the broadcast stream.
When the "done" event is fired, subscribers are unsubscribed before receiving the event. After the event has been sent, the stream has no subscribers. Adding new subscribers to a broadcast stream after this point is allowed, but they will just receive a new "done" event as soon as possible.
Stream subscriptions always respect "pause" requests. If necessary they need to buffer their input, but often, and preferably they can simply request their input to pause too.
The default implementation of isBroadcast returns false. A broadcast stream inheriting from Stream must override isBroadcast to return true if it wants to signal that it behaves like a broadcast stream.
Implementers
Constructors
Stream() const
const Stream()Implementation
dart
const Stream();Stream.empty() factory const
const factory Stream.empty({bool broadcast})Creates an empty broadcast stream.
This is a stream which does nothing except sending a done event when it's listened to.
Example:
dart
const stream = Stream.empty();
stream.listen(
(value) {
throw "Unreachable";
},
onDone: () {
print('Done');
},
);The stream defaults to being a broadcast stream, as reported by isBroadcast. This value can be changed by passing false as the broadcast parameter, which defaults to true.
The stream can be listened to more than once, whether it reports itself as broadcast or not.
Implementation
dart
const factory Stream.empty({@Since("3.2") bool broadcast}) = _EmptyStream<T>;Stream.error() factory
factory Stream.error(Object error, [StackTrace? stackTrace])Creates a stream which emits a single error event before completing.
This stream emits a single error event of error and stackTrace and then completes with a done event.
Example:
dart
Future<void> tryThings(Stream<int> data) async {
try {
await for (var x in data) {
print('Data: $x');
}
} catch (e) {
print(e);
}
}
tryThings(Stream<int>.error('Error')); // prints "Error".The returned stream is effectively equivalent to one created by Future<T>.error(error, stackTrace).asStream(), by or (() async* { throw error; } ()), except that you can control the stack trace as well.
Implementation
dart
factory Stream.error(Object error, [StackTrace? stackTrace]) {
AsyncError(:error, :stackTrace) = _interceptUserError(error, stackTrace);
return (_AsyncStreamController<T>(null, null, null, null)
.._addError(error, stackTrace)
.._closeUnchecked())
.stream;
}Stream.eventTransformed() factory
factory Stream.eventTransformed(
Stream<dynamic> source,
EventSink<dynamic> Function(EventSink<T> sink) mapSink,
)Creates a stream where all events of an existing stream are piped through a sink-transformation.
The given mapSink closure is invoked when the returned stream is listened to. All events from the source are added into the event sink that is returned from the invocation. The transformation puts all transformed events into the sink the mapSink closure received during its invocation. Conceptually the mapSink creates a transformation pipe with the input sink being the returned EventSink and the output sink being the sink it received.
This constructor is frequently used to build transformers.
Example use for a duplicating transformer:
dart
class DuplicationSink implements EventSink<String> {
final EventSink<String> _outputSink;
DuplicationSink(this._outputSink);
void add(String data) {
_outputSink.add(data);
_outputSink.add(data);
}
void addError(e, [st]) { _outputSink.addError(e, st); }
void close() { _outputSink.close(); }
}
class DuplicationTransformer extends StreamTransformerBase<String, String> {
// Some generic types omitted for brevity.
Stream bind(Stream stream) => Stream<String>.eventTransformed(
stream,
(EventSink sink) => DuplicationSink(sink));
}
stringStream.transform(DuplicationTransformer());The resulting stream is a broadcast stream if source is.
Implementation
dart
factory Stream.eventTransformed(
Stream<dynamic> source,
EventSink<dynamic> mapSink(EventSink<T> sink),
) {
return _BoundSinkStream(source, mapSink);
}Stream.fromFuture() factory
factory Stream.fromFuture(Future<T> future)Creates a new single-subscription stream from the future.
When the future completes, the stream will fire one event, either data or error, and then close with a done-event.
Example:
dart
Future<String> futureTask() async {
await Future.delayed(const Duration(seconds: 5));
return 'Future complete';
}
final stream = Stream<String>.fromFuture(futureTask());
stream.listen(print,
onDone: () => print('Done'), onError: print);
// Outputs:
// "Future complete" after 'futureTask' finished.
// "Done" when stream completed.Implementation
dart
factory Stream.fromFuture(Future<T> future) {
// Use the controller's buffering to fill in the value even before
// the stream has a listener. For a single value, it's not worth it
// to wait for a listener before doing the `then` on the future.
_StreamController<T> controller = _SyncStreamController<T>(
null,
null,
null,
null,
);
future.then(
(value) {
controller._add(value);
controller._closeUnchecked();
},
onError: (error, stackTrace) {
controller._addError(error, stackTrace);
controller._closeUnchecked();
},
);
return controller.stream;
}Stream.fromFutures() factory
Create a single-subscription stream from a group of futures.
The stream reports the results of the futures on the stream in the order in which the futures complete. Each future provides either a data event or an error event, depending on how the future completes.
If some futures have already completed when Stream.fromFutures is called, their results will be emitted in some unspecified order.
When all futures have completed, the stream is closed.
If futures is empty, the stream closes as soon as possible.
Example:
dart
Future<int> waitTask() async {
await Future.delayed(const Duration(seconds: 2));
return 10;
}
Future<String> doneTask() async {
await Future.delayed(const Duration(seconds: 5));
return 'Future complete';
}
final stream = Stream<Object>.fromFutures([doneTask(), waitTask()]);
stream.listen(print, onDone: () => print('Done'), onError: print);
// Outputs:
// 10 after 'waitTask' finished.
// "Future complete" after 'doneTask' finished.
// "Done" when stream completed.Implementation
dart
factory Stream.fromFutures(Iterable<Future<T>> futures) {
_StreamController<T> controller = _SyncStreamController<T>(
null,
null,
null,
null,
);
int count = 0;
// Declare these as variables holding closures instead of as
// function declarations.
// This avoids creating a new closure from the functions for each future.
void onValue(T value) {
if (!controller.isClosed) {
controller._add(value);
if (--count == 0) controller._closeUnchecked();
}
}
void onError(Object error, StackTrace stack) {
if (!controller.isClosed) {
controller._addError(error, stack);
if (--count == 0) controller._closeUnchecked();
}
}
// The futures are already running, so start listening to them immediately
// (instead of waiting for the stream to be listened on).
// If we wait, we might not catch errors in the futures in time.
for (var future in futures) {
count++;
future.then(onValue, onError: onError);
}
// Use schedule microtask since controller is sync.
if (count == 0) scheduleMicrotask(controller.close);
return controller.stream;
}Stream.fromIterable() factory
factory Stream.fromIterable(Iterable<T> elements)Creates a stream that gets its data from elements.
The iterable is iterated when the stream receives a listener, and stops iterating if the listener cancels the subscription, or if the Iterator.moveNext method returns false or throws. Iteration is suspended while the stream subscription is paused.
If calling Iterator.moveNext on elements.iterator throws, the stream emits that error and then it closes. If reading Iterator.current on elements.iterator throws, the stream emits that error, but keeps iterating.
Can be listened to more than once. Each listener iterates elements independently.
Example:
dart
final numbers = [1, 2, 3, 5, 6, 7];
final stream = Stream.fromIterable(numbers);Implementation
dart
factory Stream.fromIterable(Iterable<T> elements) =>
Stream<T>.multi((controller) {
Iterator<T> iterator;
try {
iterator = elements.iterator;
} catch (e, s) {
var error = _interceptCaughtError(e, s);
controller.addError(error.error, error.stackTrace);
controller.close();
return;
}
var zone = Zone.current;
var isScheduled = true;
void next() {
if (!controller.hasListener || controller.isPaused) {
// Cancelled or paused since scheduled.
isScheduled = false;
return;
}
bool hasNext;
try {
hasNext = iterator.moveNext();
} catch (e, s) {
var error = _interceptCaughtError(e, s);
controller.addErrorSync(error.error, error.stackTrace);
controller.closeSync();
return;
}
if (hasNext) {
try {
controller.addSync(iterator.current);
} catch (e, s) {
var error = _interceptCaughtError(e, s);
controller.addErrorSync(error.error, error.stackTrace);
}
if (controller.hasListener && !controller.isPaused) {
zone.scheduleMicrotask(next);
} else {
isScheduled = false;
}
} else {
controller.closeSync();
}
}
controller.onResume = () {
if (!isScheduled) {
isScheduled = true;
zone.scheduleMicrotask(next);
}
};
zone.scheduleMicrotask(next);
});Stream.multi() factory
factory Stream.multi(
void Function(MultiStreamController<T>) onListen, {
bool isBroadcast = false,
})Creates a multi-subscription stream.
Each time the created stream is listened to, the onListen callback is invoked with a new MultiStreamController, which forwards events to the StreamSubscription returned by that listen call.
This allows each listener to be treated as an individual stream.
The MultiStreamController does not support reading its StreamController.stream. Setting its StreamController.onListen has no effect since the onListen callback is called instead, and the StreamController.onListen won't be called later. The controller acts like an asynchronous controller, but provides extra methods for delivering events synchronously.
If isBroadcast is set to true, the returned stream's Stream.isBroadcast will be true. This has no effect on the stream behavior, it is up to the onListen function to act like a broadcast stream if it claims to be one.
A multi-subscription stream can behave like any other stream. If the onListen callback throws on every call after the first, the stream behaves like a single-subscription stream. If the stream emits the same events to all current listeners, it behaves like a broadcast stream.
It can also choose to emit different events to different listeners. For example, a stream which repeats the most recent non-null event to new listeners, could be implemented as this example:
dart
extension StreamRepeatLatestExtension<T extends Object> on Stream<T> {
Stream<T> repeatLatest() {
var done = false;
T? latest = null;
var currentListeners = <MultiStreamController<T>>{};
this.listen((event) {
latest = event;
for (var listener in [...currentListeners]) listener.addSync(event);
}, onError: (Object error, StackTrace stack) {
for (var listener in [...currentListeners]) listener.addErrorSync(error, stack);
}, onDone: () {
done = true;
latest = null;
for (var listener in currentListeners) listener.closeSync();
currentListeners.clear();
});
return Stream.multi((controller) {
if (done) {
controller.close();
return;
}
currentListeners.add(controller);
var latestValue = latest;
if (latestValue != null) controller.add(latestValue);
controller.onCancel = () {
currentListeners.remove(controller);
};
});
}
}Implementation
dart
factory Stream.multi(
void Function(MultiStreamController<T>) onListen, {
bool isBroadcast = false,
}) {
return _MultiStream<T>(onListen, isBroadcast);
}Stream.periodic() factory
Creates a stream that repeatedly emits events at period intervals.
The event values are computed by invoking computation. The argument to this callback is an integer that starts with 0 and is incremented for every event.
The period must be a non-negative Duration.
If computation is omitted, the event values will all be null.
The computation must not be omitted if the event type T does not allow null as a value.
Example:
dart
final stream =
Stream<int>.periodic(const Duration(
seconds: 1), (count) => count * count).take(5);
stream.forEach(print); // Outputs event values 0,1,4,9,16.Implementation
dart
factory Stream.periodic(
Duration period, [
T computation(int computationCount)?,
]) {
if (computation == null && !typeAcceptsNull<T>()) {
throw ArgumentError.value(
null,
"computation",
"Must not be omitted when the event type is non-nullable",
);
}
var controller = _SyncStreamController<T>(null, null, null, null);
// Counts the time that the Stream was running (and not paused).
Stopwatch watch = Stopwatch();
controller.onListen = () {
int computationCount = 0;
void sendEvent(_) {
watch.reset();
if (computation != null) {
T event;
try {
event = computation(computationCount++);
} catch (e, s) {
var error = _interceptCaughtError(e, s);
controller.addError(error.error, error.stackTrace);
return;
}
controller.add(event);
} else {
controller.add(null as T); // We have checked that null is T.
}
}
Timer timer = Timer.periodic(period, sendEvent);
controller
..onCancel = () {
timer.cancel();
return Future._nullFuture;
}
..onPause = () {
watch.stop();
timer.cancel();
}
..onResume = () {
Duration elapsed = watch.elapsed;
watch.start();
timer = Timer(period - elapsed, () {
timer = Timer.periodic(period, sendEvent);
sendEvent(null);
});
};
};
return controller.stream;
}Stream.value() factory
factory Stream.value(T value)Creates a stream which emits a single data event before closing.
This stream emits a single data event of value and then closes with a done event.
Example:
dart
Future<void> printThings(Stream<String> data) async {
await for (var x in data) {
print(x);
}
}
printThings(Stream<String>.value('ok')); // prints "ok".The returned stream is effectively equivalent to one created by (() async* { yield value; } ()) or Future<T>.value(value).asStream().
Implementation
dart
factory Stream.value(T value) =>
(_AsyncStreamController<T>(null, null, null, null)
.._add(value)
.._closeUnchecked())
.stream;Properties
first no setter
Future<T> get firstThe first element of this stream.
Stops listening to this stream after the first element has been received.
Internally the method cancels its subscription after the first element. This means that single-subscription (non-broadcast) streams are closed and cannot be reused after a call to this getter.
If an error event occurs before the first data event, the returned future is completed with that error.
If this stream is empty (a done event occurs before the first data event), the returned future completes with an error.
Except for the type of the error, this method is equivalent to this.elementAt(0).
Implementation
dart
Future<T> get first {
@pragma('vm:awaiter-link')
_Future<T> future = _Future<T>();
StreamSubscription<T> subscription = this.listen(
null,
onError: future._completeError,
onDone: () {
var stack = StackTrace.current;
var error = IterableElementError.noElement();
_trySetStackTrace(error, stack);
_completeWithErrorCallback(future, error, stack);
},
cancelOnError: true,
);
subscription.onData((T value) {
_cancelAndValue(subscription, future, value);
});
return future;
}hashCode no setter inherited
int get hashCodeThe hash code for this object.
A hash code is a single integer which represents the state of the object that affects operator == comparisons.
All objects have hash codes. The default hash code implemented by Object represents only the identity of the object, the same way as the default operator == implementation only considers objects equal if they are identical (see identityHashCode).
If operator == is overridden to use the object state instead, the hash code must also be changed to represent that state, otherwise the object cannot be used in hash based data structures like the default Set and Map implementations.
Hash codes must be the same for objects that are equal to each other according to operator ==. The hash code of an object should only change if the object changes in a way that affects equality. There are no further requirements for the hash codes. They need not be consistent between executions of the same program and there are no distribution guarantees.
Objects that are not equal are allowed to have the same hash code. It is even technically allowed that all instances have the same hash code, but if clashes happen too often, it may reduce the efficiency of hash-based data structures like HashSet or HashMap.
If a subclass overrides hashCode, it should override the operator == operator as well to maintain consistency.
Inherited from Object.
Implementation
dart
external int get hashCode;isBroadcast no setter
bool get isBroadcastWhether this stream is a broadcast stream.
Implementation
dart
bool get isBroadcast => false;isEmpty no setter
Whether this stream contains any elements.
Waits for the first element of this stream, then completes the returned future with false. If this stream ends without emitting any elements, the returned future is completed with true.
If the first event is an error, the returned future is completed with that error.
This operation listens to this stream, and a non-broadcast stream cannot be reused after checking whether it is empty.
Implementation
dart
Future<bool> get isEmpty {
_Future<bool> future = _Future<bool>();
StreamSubscription<T> subscription = this.listen(
null,
onError: future._completeError,
onDone: () {
future._complete(true);
},
cancelOnError: true,
);
subscription.onData((_) {
_cancelAndValue(subscription, future, false);
});
return future;
}last no setter
Future<T> get lastThe last element of this stream.
If this stream emits an error event, the returned future is completed with that error and processing stops.
If this stream is empty (the done event is the first event), the returned future completes with an error.
Implementation
dart
Future<T> get last {
@pragma('vm:awaiter-link')
_Future<T> future = _Future<T>();
late T result;
bool foundResult = false;
listen(
(T value) {
foundResult = true;
result = value;
},
onError: future._completeError,
onDone: () {
if (foundResult) {
future._complete(result);
return;
}
var stack = StackTrace.current;
var error = IterableElementError.noElement();
_trySetStackTrace(error, stack);
_completeWithErrorCallback(future, error, stack);
},
cancelOnError: true,
);
return future;
}length no setter
The number of elements in this stream.
Waits for all elements of this stream. When this stream ends, the returned future is completed with the number of elements.
If this stream emits an error, the returned future is completed with that error, and processing stops.
This operation listens to this stream, and a non-broadcast stream cannot be reused after finding its length.
Implementation
dart
Future<int> get length {
_Future<int> future = _Future<int>();
int count = 0;
this.listen(
(_) {
count++;
},
onError: future._completeError,
onDone: () {
future._complete(count);
},
cancelOnError: true,
);
return future;
}runtimeType no setter inherited
Type get runtimeTypeA representation of the runtime type of the object.
Inherited from Object.
Implementation
dart
external Type get runtimeType;single no setter
Future<T> get singleThe single element of this stream.
If this stream emits an error event, the returned future is completed with that error and processing stops.
If this Stream is empty or has more than one element, the returned future completes with an error.
Implementation
dart
Future<T> get single {
@pragma('vm:awaiter-link')
_Future<T> future = _Future<T>();
late T result;
bool foundResult = false;
StreamSubscription<T> subscription = this.listen(
null,
onError: future._completeError,
onDone: () {
if (foundResult) {
future._complete(result);
return;
}
var stack = StackTrace.current;
var error = IterableElementError.noElement();
_trySetStackTrace(error, stack);
_completeWithErrorCallback(future, error, stack);
},
cancelOnError: true,
);
subscription.onData((T value) {
if (foundResult) {
// This is the second element we get.
var stack = StackTrace.current;
var error = IterableElementError.tooMany();
_trySetStackTrace(error, stack);
_cancelAndErrorWithReplacement(subscription, future, error, stack);
return;
}
foundResult = true;
result = value;
});
return future;
}Methods
any()
Checks whether test accepts any element provided by this stream.
Calls test on each element of this stream. If the call returns true, the returned future is completed with true and processing stops.
If this stream ends without finding an element that test accepts, the returned future is completed with false.
If this stream emits an error, or if the call to test throws, the returned future is completed with that error, and processing stops.
Example:
dart
final result =
await Stream.periodic(const Duration(seconds: 1), (count) => count)
.take(15)
.any((element) => element >= 5);
print(result); // trueImplementation
dart
Future<bool> any(bool test(T element)) {
_Future<bool> future = _Future<bool>();
StreamSubscription<T> subscription = this.listen(
null,
onError: future._completeError,
onDone: () {
future._complete(false);
},
cancelOnError: true,
);
subscription.onData((T element) {
_runUserCode(() => test(element), (bool isMatch) {
if (isMatch) {
_cancelAndValue(subscription, future, true);
}
}, _cancelAndErrorClosure(subscription, future));
});
return future;
}asBroadcastStream()
Stream<T> asBroadcastStream({
(void Function(StreamSubscription<T> subscription))? onListen,
(void Function(StreamSubscription<T> subscription))? onCancel,
})Returns a multi-subscription stream that produces the same events as this.
The returned stream will subscribe to this stream when its first subscriber is added, and will stay subscribed until this stream ends, or a callback cancels the subscription.
If onListen is provided, it is called with a subscription-like object that represents the underlying subscription to this stream. It is possible to pause, resume or cancel the subscription during the call to onListen. It is not possible to change the event handlers, including using StreamSubscription.asFuture.
If onCancel is provided, it is called in a similar way to onListen when the returned stream stops having listeners. If it later gets a new listener, the onListen function is called again.
Use the callbacks, for example, for pausing the underlying subscription while having no subscribers to prevent losing events, or canceling the subscription when there are no listeners.
Cancelling is intended to be used when there are no current subscribers. If the subscription passed to onListen or onCancel is cancelled, then no further events are ever emitted by current subscriptions on the returned broadcast stream, not even a done event.
Example:
dart
final stream =
Stream<int>.periodic(const Duration(seconds: 1), (count) => count)
.take(10);
final broadcastStream = stream.asBroadcastStream(
onCancel: (controller) {
print('Stream paused');
controller.pause();
},
onListen: (controller) async {
if (controller.isPaused) {
print('Stream resumed');
controller.resume();
}
},
);
final oddNumberStream = broadcastStream.where((event) => event.isOdd);
final oddNumberListener = oddNumberStream.listen(
(event) {
print('Odd: $event');
},
onDone: () => print('Done'),
);
final evenNumberStream = broadcastStream.where((event) => event.isEven);
final evenNumberListener = evenNumberStream.listen((event) {
print('Even: $event');
}, onDone: () => print('Done'));
await Future.delayed(const Duration(milliseconds: 3500)); // 3.5 second
// Outputs:
// Even: 0
// Odd: 1
// Even: 2
oddNumberListener.cancel(); // Nothing printed.
evenNumberListener.cancel(); // "Stream paused"
await Future.delayed(const Duration(seconds: 2));
print(await broadcastStream.first); // "Stream resumed"
// Outputs:
// 3Implementation
dart
Stream<T> asBroadcastStream({
void onListen(StreamSubscription<T> subscription)?,
void onCancel(StreamSubscription<T> subscription)?,
}) {
return _AsBroadcastStream<T>(this, onListen, onCancel);
}asyncExpand()
Transforms each element into a sequence of asynchronous events.
Returns a new stream and for each event of this stream, do the following:
- If the event is an error event or a done event, it is emitted directly by the returned stream.
- Otherwise it is an element. Then the
convertfunction is called with the element as argument to produce a convert-stream for the element. - If that call throws, the error is emitted on the returned stream.
- If the call returns
null, no further action is taken for the elements. - Otherwise, this stream is paused and convert-stream is listened to. Every data and error event of the convert-stream is emitted on the returned stream in the order it is produced. When the convert-stream ends, this stream is resumed.
The returned stream is a broadcast stream if this stream is.
Implementation
dart
Stream<E> asyncExpand<E>(Stream<E>? convert(T event)) {
_StreamControllerBase<E> controller;
if (isBroadcast) {
controller = _SyncBroadcastStreamController<E>(null, null);
} else {
controller = _SyncStreamController<E>(null, null, null, null);
}
controller.onListen = () {
StreamSubscription<T> subscription = this.listen(
null,
onError: controller._addError, // Avoid Zone error replacement.
onDone: controller.close,
);
subscription.onData((T event) {
Stream<E>? newStream;
try {
newStream = convert(event);
} catch (e, s) {
var error = _interceptCaughtError(e, s);
controller.addError(error.error, error.stackTrace);
return;
}
if (newStream != null) {
subscription.pause();
controller.addStream(newStream).whenComplete(subscription.resume);
}
});
controller.onCancel = subscription.cancel;
if (!isBroadcast) {
controller
..onPause = subscription.pause
..onResume = subscription.resume;
}
};
return controller.stream;
}asyncMap()
Creates a new stream with each data event of this stream asynchronously mapped to a new event.
This acts like map, in that convert function is called once per data event, but here convert may be asynchronous and return a Future. If that happens, this stream waits for that future to complete before continuing with further events.
The returned stream is a broadcast stream if this stream is.
Implementation
dart
Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) {
_StreamControllerBase<E> controller;
if (isBroadcast) {
controller = _SyncBroadcastStreamController<E>(null, null);
} else {
controller = _SyncStreamController<E>(null, null, null, null);
}
controller.onListen = () {
StreamSubscription<T> subscription = this.listen(
null,
onError: controller._addError, // Avoid Zone error replacement.
onDone: controller.close,
);
FutureOr<Null> add(E value) {
controller.add(value);
}
final addError = controller._addError;
final resume = subscription.resume;
subscription.onData((T event) {
FutureOr<E> newValue;
try {
newValue = convert(event);
} catch (e, s) {
var error = _interceptCaughtError(e, s);
controller.addError(error.error, error.stackTrace);
return;
}
if (newValue is Future<E>) {
subscription.pause();
newValue.then(add, onError: addError).whenComplete(resume);
} else {
controller.add(newValue);
}
});
controller.onCancel = subscription.cancel;
if (!isBroadcast) {
controller
..onPause = subscription.pause
..onResume = resume;
}
};
return controller.stream;
}cast()
Stream<R> cast<R>()Adapt this stream to be a Stream<R>.
This stream is wrapped as a Stream<R> which checks at run-time that each data event emitted by this stream is also an instance of R.
Implementation
dart
Stream<R> cast<R>() => Stream.castFrom<T, R>(this);contains()
Returns whether needle occurs in the elements provided by this stream.
Compares each element of this stream to needle using Object.==. If an equal element is found, the returned future is completed with true. If this stream ends without finding a match, the future is completed with false.
If this stream emits an error, or the call to Object.== throws, the returned future is completed with that error, and processing stops.
Example:
dart
final result = await Stream.fromIterable(['Year', 2021, 12, 24, 'Dart'])
.contains('Dart');
print(result); // trueImplementation
dart
Future<bool> contains(Object? needle) {
_Future<bool> future = _Future<bool>();
StreamSubscription<T> subscription = this.listen(
null,
onError: future._completeError,
onDone: () {
future._complete(false);
},
cancelOnError: true,
);
subscription.onData((T element) {
_runUserCode(() => (element == needle), (bool isMatch) {
if (isMatch) {
_cancelAndValue(subscription, future, true);
}
}, _cancelAndErrorClosure(subscription, future));
});
return future;
}distinct()
Skips data events if they are equal to the previous data event.
The returned stream provides the same events as this stream, except that it never provides two consecutive data events that are equal. That is, errors are passed through to the returned stream, and data events are passed through if they are distinct from the most recently emitted data event.
Equality is determined by the provided equals method. If that is omitted, the '==' operator on the last provided data element is used.
If equals throws, the data event is replaced by an error event containing the thrown error. The behavior is equivalent to the original stream emitting the error event, and it doesn't change what the most recently emitted data event is.
The returned stream is a broadcast stream if this stream is. If a broadcast stream is listened to more than once, each subscription will individually perform the equals test.
Example:
dart
final stream = Stream.fromIterable([2, 6, 6, 8, 12, 8, 8, 2]).distinct();
stream.forEach(print); // Outputs events: 2,6,8,12,8,2.Implementation
dart
Stream<T> distinct([bool equals(T previous, T next)?]) {
return _DistinctStream<T>(this, equals);
}drain()
Future<E> drain<E>([E? futureValue])Discards all data on this stream, but signals when it is done or an error occurred.
When subscribing using drain, cancelOnError will be true. This means that the future will complete with the first error on this stream and then cancel the subscription.
If this stream emits an error, the returned future is completed with that error, and processing is stopped.
In case of a done event the future completes with the given futureValue.
The futureValue must not be omitted if null is not assignable to E.
Example:
dart
final result = await Stream.fromIterable([1, 2, 3]).drain(100);
print(result); // Outputs: 100.Implementation
dart
Future<E> drain<E>([E? futureValue]) {
if (futureValue == null) {
futureValue = futureValue as E;
}
return listen(null, cancelOnError: true).asFuture<E>(futureValue);
}elementAt()
Returns the value of the indexth data event of this stream.
Stops listening to this stream after the indexth data event has been received.
Internally the method cancels its subscription after these elements. This means that single-subscription (non-broadcast) streams are closed and cannot be reused after a call to this method.
If an error event occurs before the value is found, the future completes with this error.
If a done event occurs before the value is found, the future completes with a RangeError.
Implementation
dart
Future<T> elementAt(int index) {
RangeError.checkNotNegative(index, "index");
@pragma('vm:awaiter-link')
_Future<T> result = _Future<T>();
int elementIndex = 0;
StreamSubscription<T> subscription;
subscription = this.listen(
null,
onError: result._completeError,
onDone: () {
result._completeError(
IndexError.withLength(
index,
elementIndex,
indexable: this,
name: "index",
),
StackTrace.current,
);
},
cancelOnError: true,
);
subscription.onData((T value) {
if (index == elementIndex) {
_cancelAndValue(subscription, result, value);
return;
}
elementIndex += 1;
});
return result;
}every()
Checks whether test accepts all elements provided by this stream.
Calls test on each element of this stream. If the call returns false, the returned future is completed with false and processing stops.
If this stream ends without finding an element that test rejects, the returned future is completed with true.
If this stream emits an error, or if the call to test throws, the returned future is completed with that error, and processing stops.
Example:
dart
final result =
await Stream.periodic(const Duration(seconds: 1), (count) => count)
.take(15)
.every((x) => x <= 5);
print(result); // falseImplementation
dart
Future<bool> every(bool test(T element)) {
_Future<bool> future = _Future<bool>();
StreamSubscription<T> subscription = this.listen(
null,
onError: future._completeError,
onDone: () {
future._complete(true);
},
cancelOnError: true,
);
subscription.onData((T element) {
_runUserCode(() => test(element), (bool isMatch) {
if (!isMatch) {
_cancelAndValue(subscription, future, false);
}
}, _cancelAndErrorClosure(subscription, future));
});
return future;
}expand()
Transforms each element of this stream into a sequence of elements.
Returns a new stream where each element of this stream is replaced by zero or more data events. The event values are provided as an Iterable by a call to convert with the element as argument, and the elements of that iterable is emitted in iteration order. If calling convert throws, or if the iteration of the returned values throws, the error is emitted on the returned stream and iteration ends for that element of this stream.
Error events and the done event of this stream are forwarded directly to the returned stream.
The returned stream is a broadcast stream if this stream is. If a broadcast stream is listened to more than once, each subscription will individually call convert and expand the events.
Implementation
dart
Stream<S> expand<S>(Iterable<S> convert(T element)) {
return _ExpandStream<T, S>(this, convert);
}firstWhere()
Finds the first element of this stream matching test.
Returns a future that is completed with the first element of this stream for which test returns true.
If no such element is found before this stream is done, and an orElse function is provided, the result of calling orElse becomes the value of the future. If orElse throws, the returned future is completed with that error.
If this stream emits an error before the first matching element, the returned future is completed with that error, and processing stops.
Stops listening to this stream after the first matching element or error has been received.
Internally the method cancels its subscription after the first element that matches the predicate. This means that single-subscription (non-broadcast) streams are closed and cannot be reused after a call to this method.
If an error occurs, or if this stream ends without finding a match and with no orElse function provided, the returned future is completed with an error.
Example:
dart
var result = await Stream.fromIterable([1, 3, 4, 9, 12])
.firstWhere((element) => element % 6 == 0, orElse: () => -1);
print(result); // 12
result = await Stream.fromIterable([1, 2, 3, 4, 5])
.firstWhere((element) => element % 6 == 0, orElse: () => -1);
print(result); // -1Implementation
dart
Future<T> firstWhere(bool test(T element), {T orElse()?}) {
@pragma('vm:awaiter-link')
_Future<T> future = _Future();
StreamSubscription<T> subscription = this.listen(
null,
onError: future._completeError,
onDone: () {
if (orElse != null) {
_runUserCode(orElse, future._complete, future._completeError);
return;
}
var stack = StackTrace.current;
var error = IterableElementError.noElement();
_trySetStackTrace(error, stack);
_completeWithErrorCallback(future, error, stack);
},
cancelOnError: true,
);
subscription.onData((T value) {
_runUserCode(() => test(value), (bool isMatch) {
if (isMatch) {
_cancelAndValue(subscription, future, value);
}
}, _cancelAndErrorClosure(subscription, future));
});
return future;
}fold()
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine)Combines a sequence of values by repeatedly applying combine.
Similar to Iterable.fold, this function maintains a value, starting with initialValue and updated for each element of this stream. For each element, the value is updated to the result of calling combine with the previous value and the element.
When this stream is done, the returned future is completed with the value at that time. For an empty stream, the future is completed with initialValue.
If this stream emits an error, or the call to combine throws, the returned future is completed with that error, and processing is stopped.
Example:
dart
final result = await Stream.fromIterable([2, 6, 10, 8, 2])
.fold<int>(10, (previous, element) => previous + element);
print(result); // 38Implementation
dart
Future<S> fold<S>(S initialValue, S combine(S previous, T element)) {
_Future<S> result = _Future<S>();
S value = initialValue;
StreamSubscription<T> subscription = this.listen(
null,
onError: result._completeError,
onDone: () {
result._complete(value);
},
cancelOnError: true,
);
subscription.onData((T element) {
_runUserCode(() => combine(value, element), (S newValue) {
value = newValue;
}, _cancelAndErrorClosure(subscription, result));
});
return result;
}forEach()
Future<void> forEach(void Function(T element) action)Executes action on each element of this stream.
Completes the returned Future when all elements of this stream have been processed.
If this stream emits an error, or if the call to action throws, the returned future completes with that error, and processing stops.
Implementation
dart
Future<void> forEach(void action(T element)) {
_Future future = _Future();
StreamSubscription<T> subscription = this.listen(
null,
onError: future._completeError,
onDone: () {
future._complete(null);
},
cancelOnError: true,
);
subscription.onData((T element) {
_runUserCode<void>(
() => action(element),
(_) {},
_cancelAndErrorClosure(subscription, future),
);
});
return future;
}handleError()
Creates a wrapper Stream that intercepts some errors from this stream.
If this stream sends an error that matches test, then it is intercepted by the onError function.
The onError callback must be of type void Function(Object error) or void Function(Object error, StackTrace). The function type determines whether onError is invoked with a stack trace argument. The stack trace argument may be StackTrace.empty if this stream received an error without a stack trace.
An asynchronous error error is matched by a test function if test(error) returns true. If test is omitted, every error is considered matching.
If the error is intercepted, the onError function can decide what to do with it. It can throw if it wants to raise a new (or the same) error, or simply return to make this stream forget the error. If the received error value is thrown again by the onError function, it acts like a rethrow and it is emitted along with its original stack trace, not the stack trace of the throw inside onError.
If you need to transform an error into a data event, use the more generic Stream.transform to handle the event by writing a data event to the output sink.
The returned stream is a broadcast stream if this stream is. If a broadcast stream is listened to more than once, each subscription will individually perform the test and handle the error.
Example:
dart
Stream.periodic(const Duration(seconds: 1), (count) {
if (count == 2) {
throw Exception('Exceptional event');
}
return count;
}).take(4).handleError(print).forEach(print);
// Outputs:
// 0
// 1
// Exception: Exceptional event
// 3
// 4Implementation
dart
Stream<T> handleError(Function onError, {bool test(error)?}) {
final void Function(Object, StackTrace) callback;
if (onError is void Function(Object, StackTrace)) {
callback = onError;
} else if (onError is void Function(Object)) {
callback = (Object error, StackTrace _) {
onError(error);
};
} else {
throw ArgumentError.value(
onError,
"onError",
"Error handler must accept one Object or one Object and a StackTrace"
" as arguments.",
);
}
return _HandleErrorStream<T>(this, callback, test);
}join()
Combines the string representation of elements into a single string.
Each element is converted to a string using its Object.toString method. If separator is provided, it is inserted between element string representations.
The returned future is completed with the combined string when this stream is done.
If this stream emits an error, or the call to Object.toString throws, the returned future is completed with that error, and processing stops.
Example:
dart
final result = await Stream.fromIterable(['Mars', 'Venus', 'Earth'])
.join('--');
print(result); // 'Mars--Venus--Earth'Implementation
dart
Future<String> join([String separator = ""]) {
_Future<String> result = _Future<String>();
StringBuffer buffer = StringBuffer();
bool first = true;
StreamSubscription<T> subscription = this.listen(
null,
onError: result._completeError,
onDone: () {
result._complete(buffer.toString());
},
cancelOnError: true,
);
subscription.onData(
separator.isEmpty
? (T element) {
try {
buffer.write(element);
} catch (e, s) {
_cancelAndErrorWithReplacement(subscription, result, e, s);
}
}
: (T element) {
if (!first) {
buffer.write(separator);
}
first = false;
try {
buffer.write(element);
} catch (e, s) {
_cancelAndErrorWithReplacement(subscription, result, e, s);
}
},
);
return result;
}lastWhere()
Finds the last element in this stream matching test.
Returns a future that is completed with the last element of this stream for which test returns true.
If no such element is found before this stream is done, and an orElse function is provided, the result of calling orElse becomes the value of the future. If orElse throws, the returned future is completed with that error.
If this stream emits an error at any point, the returned future is completed with that error, and the subscription is canceled.
A non-error result cannot be provided before this stream is done.
Similar too firstWhere, except that the last matching element is found instead of the first.
Example:
dart
var result = await Stream.fromIterable([1, 3, 4, 7, 12, 24, 32])
.lastWhere((element) => element % 6 == 0, orElse: () => -1);
print(result); // 24
result = await Stream.fromIterable([1, 3, 4, 7, 12, 24, 32])
.lastWhere((element) => element % 10 == 0, orElse: () => -1);
print(result); // -1Implementation
dart
Future<T> lastWhere(bool test(T element), {T orElse()?}) {
@pragma('vm:awaiter-link')
_Future<T> future = _Future();
late T result;
bool foundResult = false;
StreamSubscription<T> subscription = this.listen(
null,
onError: future._completeError,
onDone: () {
if (foundResult) {
future._complete(result);
return;
}
if (orElse != null) {
_runUserCode(orElse, future._complete, future._completeError);
return;
}
var stack = StackTrace.current;
var error = IterableElementError.noElement();
_trySetStackTrace(error, stack);
_completeWithErrorCallback(future, error, stack);
},
cancelOnError: true,
);
subscription.onData((T value) {
_runUserCode(() => test(value), (bool isMatch) {
if (isMatch) {
foundResult = true;
result = value;
}
}, _cancelAndErrorClosure(subscription, future));
});
return future;
}listen()
StreamSubscription<T> listen(
(void Function(T event))? onData, {
Function? onError,
(void Function())? onDone,
bool? cancelOnError,
})Adds a subscription to this stream.
Returns a StreamSubscription which handles events from this stream using the provided onData, onError and onDone handlers. The handlers can be changed on the subscription, but they start out as the provided functions.
On each data event from this stream, the subscriber's onData handler is called. If onData is null, nothing happens.
On errors from this stream, the onError handler is called with the error object and possibly a stack trace.
The onError callback must be of type void Function(Object error) or void Function(Object error, StackTrace). The function type determines whether onError is invoked with a stack trace argument. The stack trace argument may be StackTrace.empty if this stream received an error without a stack trace.
Otherwise it is called with just the error object. If onError is omitted, any errors on this stream are considered unhandled, and will be passed to the current Zone's error handler. By default unhandled async errors are treated as if they were uncaught top-level errors.
If this stream closes and sends a done event, the onDone handler is called. If onDone is null, nothing happens.
If cancelOnError is true, the subscription is automatically canceled when the first error event is delivered. The default is false.
While a subscription is paused, or when it has been canceled, the subscription doesn't receive events and none of the event handler functions are called.
Implementation
dart
StreamSubscription<T> listen(
void onData(T event)?, {
Function? onError,
void onDone()?,
bool? cancelOnError,
});map()
Stream<S> map<S>(S Function(T event) convert)Transforms each element of this stream into a new stream event.
Creates a new stream that converts each element of this stream to a new value using the convert function, and emits the result.
For each data event, o, in this stream, the returned stream provides a data event with the value convert(o). If convert throws, the returned stream reports it as an error event instead.
Error and done events are passed through unchanged to the returned stream.
The returned stream is a broadcast stream if this stream is. The convert function is called once per data event per listener. If a broadcast stream is listened to more than once, each subscription will individually call convert on each data event.
Unlike transform, this method does not treat the stream as chunks of a single value. Instead each event is converted independently of the previous and following events, which may not always be correct. For example, UTF-8 encoding, or decoding, will give wrong results if a surrogate pair, or a multibyte UTF-8 encoding, is split into separate events, and those events are attempted encoded or decoded independently.
Example:
dart
final stream =
Stream<int>.periodic(const Duration(seconds: 1), (count) => count)
.take(5);
final calculationStream =
stream.map<String>((event) => 'Square: ${event * event}');
calculationStream.forEach(print);
// Square: 0
// Square: 1
// Square: 4
// Square: 9
// Square: 16Implementation
dart
Stream<S> map<S>(S convert(T event)) {
return _MapStream<T, S>(this, convert);
}noSuchMethod() inherited
dynamic noSuchMethod(Invocation invocation)Invoked when a nonexistent method or property is accessed.
A dynamic member invocation can attempt to call a member which doesn't exist on the receiving object. Example:
dart
dynamic object = 1;
object.add(42); // Statically allowed, run-time errorThis invalid code will invoke the noSuchMethod method of the integer 1 with an Invocation representing the .add(42) call and arguments (which then throws).
Classes can override noSuchMethod to provide custom behavior for such invalid dynamic invocations.
A class with a non-default noSuchMethod invocation can also omit implementations for members of its interface. Example:
dart
class MockList<T> implements List<T> {
noSuchMethod(Invocation invocation) {
log(invocation);
super.noSuchMethod(invocation); // Will throw.
}
}
void main() {
MockList().add(42);
}This code has no compile-time warnings or errors even though the MockList class has no concrete implementation of any of the List interface methods. Calls to List methods are forwarded to noSuchMethod, so this code will log an invocation similar to Invocation.method(#add, [42]) and then throw.
If a value is returned from noSuchMethod, it becomes the result of the original invocation. If the value is not of a type that can be returned by the original invocation, a type error occurs at the invocation.
The default behavior is to throw a NoSuchMethodError.
Inherited from Object.
Implementation
dart
@pragma("vm:entry-point")
@pragma("wasm:entry-point")
external dynamic noSuchMethod(Invocation invocation);pipe()
Future<dynamic> pipe(StreamConsumer<T> streamConsumer)Pipes the events of this stream into streamConsumer.
All events of this stream are added to streamConsumer using StreamConsumer.addStream. The streamConsumer is closed when this stream has been successfully added to it - when the future returned by addStream completes without an error.
Returns a future which completes when this stream has been consumed and the consumer has been closed.
The returned future completes with the same result as the future returned by StreamConsumer.close. If the call to StreamConsumer.addStream fails in some way, this method fails in the same way.
Implementation
dart
Future pipe(StreamConsumer<T> streamConsumer) {
return streamConsumer.addStream(this).then((_) => streamConsumer.close());
}reduce()
Future<T> reduce(T Function(T previous, T element) combine)Combines a sequence of values by repeatedly applying combine.
Similar to Iterable.reduce, this function maintains a value, starting with the first element of this stream and updated for each further element of this stream. For each element after the first, the value is updated to the result of calling combine with the previous value and the element.
When this stream is done, the returned future is completed with the value at that time.
If this stream is empty, the returned future is completed with an error. If this stream emits an error, or the call to combine throws, the returned future is completed with that error, and processing is stopped.
Example:
dart
final result = await Stream.fromIterable([2, 6, 10, 8, 2])
.reduce((previous, element) => previous + element);
print(result); // 28Implementation
dart
Future<T> reduce(T combine(T previous, T element)) {
@pragma('vm:awaiter-link')
_Future<T> result = _Future<T>();
bool seenFirst = false;
late T value;
StreamSubscription<T> subscription = this.listen(
null,
onError: result._completeError,
onDone: () {
if (!seenFirst) {
var stack = StackTrace.current;
var error = IterableElementError.noElement();
_trySetStackTrace(error, stack);
_completeWithErrorCallback(result, error, stack);
} else {
result._complete(value);
}
},
cancelOnError: true,
);
subscription.onData((T element) {
if (seenFirst) {
_runUserCode(() => combine(value, element), (T newValue) {
value = newValue;
}, _cancelAndErrorClosure(subscription, result));
} else {
value = element;
seenFirst = true;
}
});
return result;
}singleWhere()
Finds the single element in this stream matching test.
Returns a future that is completed with the single element of this stream for which test returns true.
If no such element is found before this stream is done, and an orElse function is provided, the result of calling orElse becomes the value of the future. If orElse throws, the returned future is completed with that error.
Only one element may match. If more than one matching element is found an error is thrown, regardless of whether orElse was passed.
If this stream emits an error at any point, the returned future is completed with that error, and the subscription is canceled.
A non-error result cannot be provided before this stream is done.
Similar to lastWhere, except that it is an error if more than one matching element occurs in this stream.
Example:
dart
var result = await Stream.fromIterable([1, 2, 3, 6, 9, 12])
.singleWhere((element) => element % 4 == 0, orElse: () => -1);
print(result); // 12
result = await Stream.fromIterable([2, 6, 8, 12, 24, 32])
.singleWhere((element) => element % 9 == 0, orElse: () => -1);
print(result); // -1
result = await Stream.fromIterable([2, 6, 8, 12, 24, 32])
.singleWhere((element) => element % 6 == 0, orElse: () => -1);
// Throws.Implementation
dart
Future<T> singleWhere(bool test(T element), {T orElse()?}) {
@pragma('vm:awaiter-link')
_Future<T> future = _Future<T>();
late T result;
bool foundResult = false;
StreamSubscription<T> subscription = this.listen(
null,
onError: future._completeError,
onDone: () {
if (foundResult) {
future._complete(result);
return;
}
if (orElse != null) {
_runUserCode(orElse, future._complete, future._completeError);
return;
}
var stack = StackTrace.current;
var error = IterableElementError.noElement();
_trySetStackTrace(error, stack);
_completeWithErrorCallback(future, error, stack);
},
cancelOnError: true,
);
subscription.onData((T value) {
_runUserCode(() => test(value), (bool isMatch) {
if (isMatch) {
if (foundResult) {
var stack = StackTrace.current;
var error = IterableElementError.tooMany();
_trySetStackTrace(error, stack);
_cancelAndErrorWithReplacement(subscription, future, error, stack);
return;
}
foundResult = true;
result = value;
}
}, _cancelAndErrorClosure(subscription, future));
});
return future;
}skip()
Skips the first count data events from this stream.
Returns a stream that emits the same events as this stream would if listened to at the same time, except that the first count data events are not emitted. The returned stream is done when this stream is.
If this stream emits fewer than count data events before being done, the returned stream emits no data events.
The returned stream is a broadcast stream if this stream is. For a broadcast stream, the events are only counted from the time the returned stream is listened to.
Example:
dart
final stream =
Stream<int>.periodic(const Duration(seconds: 1), (i) => i).skip(7);
stream.forEach(print); // Skips events 0, ..., 6. Outputs events: 7, ...Implementation
dart
Stream<T> skip(int count) {
return _SkipStream<T>(this, count);
}skipWhile()
Skip data events from this stream while they are matched by test.
Returns a stream that emits the same events as this stream, except that data events are not emitted until a data event fails test. The test fails when called with a data event if it returns a non-true value or if the call to test throws. If the call throws, the error is emitted as an error event on the returned stream instead of the data event, otherwise the event that made test return non-true is emitted as the first data event.
Error and done events are provided by the returned stream unmodified.
The returned stream is a broadcast stream if this stream is. For a broadcast stream, the events are only tested from the time the returned stream is listened to.
Example:
dart
final stream = Stream<int>.periodic(const Duration(seconds: 1), (i) => i)
.take(10)
.skipWhile((x) => x < 5);
stream.forEach(print); // Outputs events: 5, ..., 9.Implementation
dart
Stream<T> skipWhile(bool test(T element)) {
return _SkipWhileStream<T>(this, test);
}take()
Provides at most the first count data events of this stream.
Returns a stream that emits the same events that this stream would if listened to at the same time, until either this stream ends or it has emitted count data events, at which point the returned stream is done.
If this stream produces fewer than count data events before it's done, so will the returned stream.
Starts listening to this stream when the returned stream is listened to and stops listening when the first count data events have been received.
This means that if this is a single-subscription (non-broadcast) streams it cannot be reused after the returned stream has been listened to.
If this is a broadcast stream, the returned stream is a broadcast stream. In that case, the events are only counted from the time the returned stream is listened to.
Example:
dart
final stream =
Stream<int>.periodic(const Duration(seconds: 1), (i) => i)
.take(60);
stream.forEach(print); // Outputs events: 0, ... 59.Implementation
dart
Stream<T> take(int count) {
return _TakeStream<T>(this, count);
}takeWhile()
Forwards data events while test is successful.
Returns a stream that provides the same events as this stream until test fails for a data event. The returned stream is done when either this stream is done, or when this stream first emits a data event that fails test.
The test call is considered failing if it returns a non-true value or if it throws. If the test call throws, the error is emitted as the last event on the returned streams.
Stops listening to this stream after the accepted elements.
Internally the method cancels its subscription after these elements. This means that single-subscription (non-broadcast) streams are closed and cannot be reused after a call to this method.
The returned stream is a broadcast stream if this stream is. For a broadcast stream, the events are only tested from the time the returned stream is listened to.
Example:
dart
final stream = Stream<int>.periodic(const Duration(seconds: 1), (i) => i)
.takeWhile((event) => event < 6);
stream.forEach(print); // Outputs events: 0, ..., 5.Implementation
dart
Stream<T> takeWhile(bool test(T element)) {
return _TakeWhileStream<T>(this, test);
}timeout()
Creates a new stream with the same events as this stream.
When someone is listening on the returned stream and more than timeLimit passes without any event being emitted by this stream, the onTimeout function is called, which can then emit further events on the returned stream.
The countdown starts when the returned stream is listened to, and is restarted when an event from this stream is emitted, or when listening on the returned stream is paused and resumed. The countdown is stopped when listening on the returned stream is paused or cancelled. No new countdown is started when a countdown completes and the onTimeout function is called, even if events are emitted. If the delay between events of this stream is multiple times timeLimit, at most one timeout will happen between events.
The onTimeout function is called with one argument: an EventSink that allows putting events into the returned stream. This EventSink is only valid during the call to onTimeout. Calling EventSink.close on the sink passed to onTimeout closes the returned stream, and no further events are processed.
If onTimeout is omitted, a timeout will emit a TimeoutException into the error channel of the returned stream. If the call to onTimeout throws, the error is emitted as an error on the returned stream.
The returned stream is a broadcast stream if this stream is. If a broadcast stream is listened to more than once, each subscription will have its individually timer that starts counting on listen, and the subscriptions' timers can be paused individually.
Example:
dart
Future<String> waitTask() async {
return await Future.delayed(
const Duration(seconds: 4), () => 'Complete');
}
final stream = Stream<String>.fromFuture(waitTask())
.timeout(const Duration(seconds: 2), onTimeout: (controller) {
print('TimeOut occurred');
controller.close();
});
stream.listen(print, onDone: () => print('Done'));
// Outputs:
// TimeOut occurred
// DoneImplementation
dart
Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)?}) {
_StreamControllerBase<T> controller;
if (isBroadcast) {
controller = _SyncBroadcastStreamController<T>(null, null);
} else {
controller = _SyncStreamController<T>(null, null, null, null);
}
Zone zone = Zone.current;
// Register callback immediately.
_TimerCallback timeoutCallback;
if (onTimeout == null) {
timeoutCallback = () {
controller.addError(
TimeoutException("No stream event", timeLimit),
null,
);
};
} else {
var registeredOnTimeout = zone.registerUnaryCallback<void, EventSink<T>>(
onTimeout,
);
var wrapper = _ControllerEventSinkWrapper<T>(null);
timeoutCallback = () {
wrapper._sink = controller; // Only valid during call.
zone.runUnaryGuarded(registeredOnTimeout, wrapper);
wrapper._sink = null;
};
}
// All further setup happens inside `onListen`.
controller.onListen = () {
Timer timer = zone.createTimer(timeLimit, timeoutCallback);
var subscription = this.listen(null);
// Set up event forwarding. Each data or error event resets the timer
subscription
..onData((T event) {
timer.cancel();
timer = zone.createTimer(timeLimit, timeoutCallback);
// Controller is synchronous, and the call might close the stream
// and cancel the timer,
// so create the Timer before calling into add();
// issue: https://github.com/dart-lang/sdk/issues/37565
controller.add(event);
})
..onError((Object error, StackTrace stackTrace) {
timer.cancel();
timer = zone.createTimer(timeLimit, timeoutCallback);
controller._addError(
error,
stackTrace,
); // Avoid Zone error replacement.
})
..onDone(() {
timer.cancel();
controller.close();
});
// Set up further controller callbacks.
controller.onCancel = () {
timer.cancel();
return subscription.cancel();
};
if (!isBroadcast) {
controller
..onPause = () {
timer.cancel();
subscription.pause();
}
..onResume = () {
subscription.resume();
timer = zone.createTimer(timeLimit, timeoutCallback);
};
}
};
return controller.stream;
}toList()
Collects all elements of this stream in a List.
Creates a List<T> and adds all elements of this stream to the list in the order they arrive. When this stream ends, the returned future is completed with that list.
If this stream emits an error, the returned future is completed with that error, and processing stops.
Implementation
dart
Future<List<T>> toList() {
List<T> result = <T>[];
_Future<List<T>> future = _Future<List<T>>();
this.listen(
(T data) {
result.add(data);
},
onError: future._completeError,
onDone: () {
future._complete(result);
},
cancelOnError: true,
);
return future;
}toSet()
Collects the data of this stream in a Set.
Creates a Set<T> and adds all elements of this stream to the set. in the order they arrive. When this stream ends, the returned future is completed with that set.
The returned set is the same type as created by <T>{}. If another type of set is needed, either use forEach to add each element to the set, or use toList().then((list) => new SomeOtherSet.from(list)) to create the set.
If this stream emits an error, the returned future is completed with that error, and processing stops.
Implementation
dart
Future<Set<T>> toSet() {
Set<T> result = Set<T>();
_Future<Set<T>> future = _Future<Set<T>>();
this.listen(
(T data) {
result.add(data);
},
onError: future._completeError,
onDone: () {
future._complete(result);
},
cancelOnError: true,
);
return future;
}toString() inherited
String toString()A string representation of this object.
Some classes have a default textual representation, often paired with a static parse function (like int.parse). These classes will provide the textual representation as their string representation.
Other classes have no meaningful textual representation that a program will care about. Such classes will typically override toString to provide useful information when inspecting the object, mainly for debugging or logging.
Inherited from Object.
Implementation
dart
external String toString();transform()
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer)Applies streamTransformer to this stream.
Returns the transformed stream, that is, the result of streamTransformer.bind(this). This method simply allows writing the call to streamTransformer.bind in a chained fashion, like
dart
stream.map(mapping).transform(transformation).toList()which can be more convenient than calling bind directly.
The streamTransformer can return any stream. Whether the returned stream is a broadcast stream or not, and which elements it will contain, is entirely up to the transformation.
This method should always be used for transformations which treat the entire stream as representing a single value which has perhaps been split into several parts for transport, like a file being read from disk or being fetched over a network. The transformation will then produce a new stream which transforms the stream's value incrementally (perhaps using Converter.startChunkedConversion). The resulting stream may again be chunks of the result, but does not have to correspond to specific events from the source string.
Implementation
dart
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) {
return streamTransformer.bind(this);
}where()
Creates a new stream from this stream that discards some elements.
The new stream sends the same error and done events as this stream, but it only sends the data events that satisfy the test.
If the test function throws, the data event is dropped and the error is emitted on the returned stream instead.
The returned stream is a broadcast stream if this stream is. If a broadcast stream is listened to more than once, each subscription will individually perform the test.
Example:
dart
final stream =
Stream<int>.periodic(const Duration(seconds: 1), (count) => count)
.take(10);
final customStream = stream.where((event) => event > 3 && event <= 6);
customStream.listen(print); // Outputs event values: 4,5,6.Implementation
dart
Stream<T> where(bool test(T event)) {
return _WhereStream<T>(this, test);
}Operators
operator ==() inherited
The equality operator.
The default behavior for all Objects is to return true if and only if this object and other are the same object.
Override this method to specify a different equality relation on a class. The overriding method must still be an equivalence relation. That is, it must be:
Total: It must return a boolean for all arguments. It should never throw.
Reflexive: For all objects
o,o == omust be true.Symmetric: For all objects
o1ando2,o1 == o2ando2 == o1must either both be true, or both be false.Transitive: For all objects
o1,o2, ando3, ifo1 == o2ando2 == o3are true, theno1 == o3must be true.
The method should also be consistent over time, so whether two objects are equal should only change if at least one of the objects was modified.
If a subclass overrides the equality operator, it should override the hashCode method as well to maintain consistency.
Inherited from Object.
Implementation
dart
external bool operator ==(Object other);Static Methods
castFrom()
Adapts source to be a Stream<T>.
This allows source to be used at the new type, but at run-time it must satisfy the requirements of both the new type and its original type.
Data events created by the source stream must also be instances of T.
Implementation
dart
static Stream<T> castFrom<S, T>(Stream<S> source) => CastStream<S, T>(source);