📜  RxJs – 初学者指南

📅  最后修改于: 2022-05-13 01:56:38.087000             🧑  作者: Mango

RxJs – 初学者指南

你有没有想过如何在只有 4GB 内存的设备上打开一个 10GB 的文件?或者,Netflix 应用程序如何在您按下播放后立即在您的手机上播放云托管的 4K 电影?在具有 4GB RAM 的设备上,10GB 文件实际上是无限的。您的设备如何设法加载无限的东西?

它必须以小块的形式将文件加载到内存中,读取它们并丢弃它们,然后再将更多数据加载到内存中。它必须流式传输数据并以小块的形式对其进行处理。

什么是流?

流是可能无限的数据集合。这是一个超时输入的数据序列。它可以被认为是传送带上的物品一次处理一个。

Stream = Array + Infinity

由于数据可能是无限的,我们的可信赖循环不会对它们有效。您不能编写从零到无穷大的 for 循环来处理整个流。

Javascript
for (let i = 0; i < infinite; i++) {
  const element = stream[i];
}


Javascript
// Two states => resolve, reject
const promise = new Promise(resolve, reject);
  
promise
.then((data) => console.log("Data came back:" + data)) // Success
.catch((err) => console.error("No, Ew David", err)); // Error


Javascript
const observable = from([1, 2, 3, 4]);
  
// Three states => next, complete, error
observable.subscribe({
  next: (value) => console.log("Next value:", value),
  complete: () => console.log("Infinity is Done!!! ¯\_(ツ)_/¯ "),
  error: (err) => console.log("No, Ew Binod", err),
});


Javascript
of(10, 20, 30).subscribe(
  next => console.log('next:', next),
  err => console.log('error:', err),
  () => console.log('the end'),
);


Javascript
from([10, 20, 30]).subscribe(
  next => console.log('next:', next),
  err => console.log('error:', err),
  () => console.log('the end'),
);
  
console.log('----------')
  
const promise = fetchDataFromServer();
from(promise).subscribe(
  next => console.log('next:', next),
  err => console.log('error:', err),
  () => console.log('the end'),
);


Javascript
// Another RxJS creation operator that
// starts at 0 and emits 1000 values
range(1, 1000) 
.pipe(map(x => x * 10))
.subscribe(
  next => console.log('next:', next),
  err => console.log('error:', err),
  () => console.log('the end'),
);


Javascript
range(1, 1000).pipe(mergeMap(pageNum => 
    fetchBulkDataFromServer({pageNum: pageNum})))
.pipe(map(bulkData=>`Page Num ${bulkData.page} 
    returned ${bulkData.items.length} items`))
.subscribe(
    next => console.log('next:', next),
    err => console.log('error:', err),
    () => console.log('the end'),
);


Javascript
const maxParallelApiCalls = 50;
range(1, 1000).pipe(mergeMap(pageNum =>
    fetchBulkDataFromServer({pageNum: pageNum}),
maxParallelApiCalls)).pipe(map(bulkData =>
    `Page Num ${bulkData.page} returned 
    ${bulkData.items.length} items`))
.subscribe(
    next => console.log('next:', next),
    err => console.log('error:', err),
    () => console.log('the end'),
);



问题是我们如何知道何时停止它。这就是 Observables 发挥作用的地方。

可观察的

Observables 可能是无限的集合,它们一次异步地返回一个值。即,在返回的一个值和下一个值之间可能会经过一段时间。

Observable = Array + Infinity + Asynchronous
// OR
Observable = Promise + Returns many times
————— Value 1 ————— Value 2 ————— Value 3 ————— Value 4 —————|—>

在这种方式下,它们与Promises非常相似。 Promise 可以在一段时间后返回一个值。 Observables 返回潜在的无限值,每个值之间经过一段时间。

Javascript

// Two states => resolve, reject
const promise = new Promise(resolve, reject);
  
promise
.then((data) => console.log("Data came back:" + data)) // Success
.catch((err) => console.error("No, Ew David", err)); // Error


Promise 有两种可能的状态:解决、拒绝,或者换句话说:完成、错误。

Javascript

const observable = from([1, 2, 3, 4]);
  
// Three states => next, complete, error
observable.subscribe({
  next: (value) => console.log("Next value:", value),
  complete: () => console.log("Infinity is Done!!! ¯\_(ツ)_/¯ "),
  error: (err) => console.log("No, Ew Binod", err),
});


Observables 为同一个概念添加了一个额外的状态:下一个、完成、错误。 JavaScript 中最流行的 Observable 库之一是 RxJS。使 RxJS 很棒的不仅仅是 Observables 的概念,还有广泛的Operators数组。这些 Operator 可以对 Observable 采取行动,以允许以声明的方式轻松组合复杂的异步代码。

RxJs 运算符

在 RxJS 中,运算符是接受 Observable 作为输入的函数,在其上运行一些转换,并返回新转换的 Observable 作为输出。这些运算符(大部分)是纯的、无副作用的函数;也就是说,它们不会以任何方式改变传入的 Observable。这使得 Operators 可链接或可管道;允许将应用程序中的异步逻辑分解为可管理的小块。

RxJs 将其 Operator 分为几类,但最常用的 Operator 是Creation OperatorsTransformation Operators 。在本指南中,我们将探讨如何创建 Observable、如何转换以及如何使用 Observables 发出的数据。

创建运算符: of

这是从静态数据创建 Observable 的最简单方法。它是一个易于使用的包装器,它接受任何数据序列作为输入并返回一个可以使用的 Observable。这个运算符可以方便地启动一个全新的 Observable 管道。

Javascript

of(10, 20, 30).subscribe(
  next => console.log('next:', next),
  err => console.log('error:', err),
  () => console.log('the end'),
);


输出:

'next: 10'
'next: 20'
'next: 30'
the end

创建运算符:from

此运算符类似于 `of`,但它适用于可迭代数据,即它接受数据集合并返回一个 Observable,该 Observable 一次发出一个集合的每个值。这个 Operator 的真正强大之处在于它还可以接受异步迭代,如生成器、promise 和其他 Observable。

Javascript

from([10, 20, 30]).subscribe(
  next => console.log('next:', next),
  err => console.log('error:', err),
  () => console.log('the end'),
);
  
console.log('----------')
  
const promise = fetchDataFromServer();
from(promise).subscribe(
  next => console.log('next:', next),
  err => console.log('error:', err),
  () => console.log('the end'),
);


输出:

'next: 10'
'next: 20'
'next: 30'
the end
----------
'next: {msg: "Hello world!"}'
the end

变换算子:map

该运算符与Array#map 非常相似。它接受 observable 发出的每个新值,对其进行转换,并将其传递给管道中的下一个 Operator。这就是 Streams 和 Observables 的概念基础开始大放异彩的地方。

当同样的问题可以使用Array#map解决时,为什么还要费力学习这个全新的概念呢?当我们根本无法将整个数据集加载到数组中时(即数据实际上是无限的),可观察对象就派上用场了。或者当我们没有预先提供给我们的整个数据集时。如,数据集是异步的,新值通过网络缓慢传入。或者很多时候我们都有这两个问题的意思,实际上无限的数据一次通过网络缓慢地传入几个值。

Javascript

// Another RxJS creation operator that
// starts at 0 and emits 1000 values
range(1, 1000) 
.pipe(map(x => x * 10))
.subscribe(
  next => console.log('next:', next),
  err => console.log('error:', err),
  () => console.log('the end'),
);

输出:

'next: 10'
'next: 20'
'next: 30'
....
....
....
'next: 10000'
the end

RxJs运算符几乎总是纯/无副作用的,它们一次使用一个发射值。这使得处理有效的无限数据集变得非常容易。由于该函数没有副作用,因此系统不必保留当前未处理的项目。即一次只有一个项目保存在内存中。

转换运算符:mergeMap

此运算符与map非常相似,但它的转换函数返回异步数据(Observables 或 Promises)。这使得处理对服务器或数据库的许多异步调用非常容易,甚至允许我们并行化这些调用。

Javascript

range(1, 1000).pipe(mergeMap(pageNum => 
    fetchBulkDataFromServer({pageNum: pageNum})))
.pipe(map(bulkData=>`Page Num ${bulkData.page} 
    returned ${bulkData.items.length} items`))
.subscribe(
    next => console.log('next:', next),
    err => console.log('error:', err),
    () => console.log('the end'),
);

输出:

'next: Page Num 1 returned 100 items'
'next: Page Num 2 returned 90 items'
'next: Page Num 3 returned 70 items'
'next: Page Num 4 returned 100 items'
....
....
'next: Page Num 1000 returned 30 items'
the end

由于mergeMap正在映射异步数据(Observables),它通过并行映射多个 Observables 显着加快了速度。它接受第二个参数“并发计数”,它定义了并行运行的 Observable 数量。在不使用 Observables 的情况下实现这种级别的并行异步处理并不是一项简单的任务,并且很容易导致难以调试的并发错误。

Javascript

const maxParallelApiCalls = 50;
range(1, 1000).pipe(mergeMap(pageNum =>
    fetchBulkDataFromServer({pageNum: pageNum}),
maxParallelApiCalls)).pipe(map(bulkData =>
    `Page Num ${bulkData.page} returned 
    ${bulkData.items.length} items`))
.subscribe(
    next => console.log('next:', next),
    err => console.log('error:', err),
    () => console.log('the end'),
);

输出:

'next: Page Num 7 returned 10 items'
'next: Page Num 12 returned 8 items'
'next: Page Num 38 returned 12 items'
'next: Page Num 3 returned 70 items'
....
....
'next: Page Num 1000 returned 30 items'
the end

在上面的例子中,RxJs 开始同时处理 50 个 observable,并按照完成的顺序发出这些 Observable 返回的数据。因此,无论哪个 API 调用首先返回其数据,都会通过管道传送到下一个 Operator。这是mergeMap如何并行化异步数据的时间线可视化。

 — Value 1 —————————————————————————————————|—>
 ————————————————————— Value 2 —————————————|—>
 ——————————— Value 3 ———————————————————————|—>
 —————————————————————————————— Value 4 ————|—>
 —————————————————Value 5 ——————————————————|—>
——————————————————————— Merge ————————————————————————
 — Value 1 —— Value 3 —— Value 5 —— Value 2 —— Value 4 —|—>

结论:以上示例涵盖了本初学者指南中的一些运算符,但 RxJS 有更多,适用于各种用例。查看他们的文档以探索更多信息。

参考: https://rxjs.dev/api