1. Observable 详细用法
Observable 是 RxJS 的核心概念,代表一个可观察的数据流。
创建和订阅 Observable
import { Observable } from "rxjs";
// 1. 创建Observable
const myObservable = new Observable(subscriber => {
// 发出三个值
subscriber.next('第一个值');
subscriber.next('第二个值');
subscriber.next('第三个值');
// 模拟异步操作
setTimeout(() => {
subscriber.next('异步值');
subscriber.complete(); // 完成流
}, 1000);
// 可选的清理逻辑
return () => {
console.log('Observable被取消订阅');
};
});
// 2. 订阅Observable
const subscription = myObservable.subscribe({
next: value => console.log('收到值:', value),
error: err => console.error('发生错误:', err),
complete: () => console.log('流已完成')
});
// 3. 取消订阅 (通常在组件销毁时调用)
setTimeout(() => {
subscription.unsubscribe();
}, 2000);
/* 输出顺序:
收到值: 第一个值
收到值: 第二个值
收到值: 第三个值
(等待1秒)
收到值: 异步值
流已完成
(再等待1秒)
Observable被取消订阅
*/
2. of 操作符详细用法
of
用于创建一个会立即发出给定参数的 Observable。
基本示例
import { of } from "rxjs";
// 发出固定值
of('苹果', '香蕉', '橙子').subscribe({
next: fruit => console.log('水果:', fruit),
complete: () => console.log('水果列表结束')
});
/* 输出:
水果: 苹果
水果: 香蕉
水果: 橙子
水果列表结束
*/
// 发出不同类型的数据
of(
'字符串',
123,
true,
{name: 'Alice'},
[1, 2, 3],
function hello() { return 'world'; }
).subscribe(val => console.log('收到的值:', val));
// 实际应用:模拟API返回
function mockApiCall() {
return of({id: 1, name: '用户1'});
}
mockApiCall().subscribe(user => {
console.log('用户数据:', user);
});
3. from 操作符详细用法
from
可以将多种数据类型转换为 Observable。
各种来源的转换
import { from } from "rxjs";
// 1. 从数组创建
from([10, 20, 30]).subscribe(num => console.log('数字:', num));
// 2. 从Promise创建
const promise = fetch('https://api.example.com/data')
.then(response => response.json());
from(promise).subscribe(data => {
console.log('API数据:', data);
});
// 3. 从字符串创建 (每个字符作为单独的值)
from('Hello').subscribe(char => console.log(char));
// 输出: H, e, l, l, o
// 4. 从Map或Set创建
const myMap = new Map();
myMap.set('name', 'Alice');
myMap.set('age', 25);
from(myMap).subscribe(entry => {
console.log('Map条目:', entry);
// 输出: ['name', 'Alice'], ['age', 25]
});
// 5. 实际应用:批量处理数组
const userIds = [1, 2, 3, 4];
from(userIds).subscribe(id => {
console.log('处理用户ID:', id);
// 这里可以调用API获取每个用户的详细信息
});
4. forkJoin 操作符详细用法
forkJoin
用于并行执行多个 Observable,等待它们全部完成。
完整示例
import { forkJoin, of, from, throwError } from "rxjs";
import { delay, catchError } from "rxjs/operators";
// 模拟API函数
function getUser(id) {
return of({ id, name: `用户${id}` }).pipe(delay(1000));
}
function getUserPosts(userId) {
const posts = [
{ id: 1, title: '帖子1' },
{ id: 2, title: '帖子2' }
];
return of(posts).pipe(delay(1500));
}
function getUserComments(userId) {
return from(fetch(`https://api.example.com/users/${userId}/comments`));
}
// 1. 基本用法
forkJoin([
getUser(1),
getUserPosts(1),
getUserComments(1).pipe(
catchError(error => of(`获取评论失败: ${error.message}`))
)
]).subscribe({
next: ([user, posts, comments]) => {
console.log('用户:', user);
console.log('帖子:', posts);
console.log('评论:', comments);
},
error: err => console.error('整体失败:', err),
complete: () => console.log('所有请求完成')
});
// 2. 对象形式更清晰
forkJoin({
user: getUser(1),
posts: getUserPosts(1),
comments: getUserComments(1).pipe(
catchError(error => of([])) // 错误时返回空数组
)
}).subscribe({
next: result => {
console.log('整合结果:', result);
// 结构: { user: {...}, posts: [...], comments: [...] }
}
});
// 3. 错误处理演示
forkJoin({
success: of('成功'),
failure: throwError(new Error('出错了'))
}).pipe(
catchError(error => {
console.log('捕获到错误:', error);
return of({ success: null, failure: error.message });
})
).subscribe(result => {
console.log('最终结果:', result);
});
// 4. 实际应用:并行请求多个API
function loadDashboardData() {
return forkJoin({
user: getUser(1),
notifications: from(fetch('/api/notifications')),
settings: from(fetch('/api/settings'))
});
}
loadDashboardData().subscribe(data => {
console.log('仪表盘数据:', data);
// 更新UI...
});
综合实战示例
import { forkJoin, from, of } from "rxjs";
import { map, mergeMap, catchError } from "rxjs/operators";
// 模拟API服务
class ApiService {
static getUsers() {
const users = [
{ id: 1, name: 'Alice' },
{ id: 2, name: 'Bob' }
];
return of(users).pipe(delay(500));
}
static getUserDetails(userId) {
const details = {
1: { age: 25, email: 'alice@example.com' },
2: { age: 30, email: 'bob@example.com' }
};
return of(details[userId]).pipe(delay(300));
}
static getUserPosts(userId) {
const posts = {
1: [{ id: 101, title: 'Alice的第一篇帖子' }],
2: [{ id: 201, title: 'Bob的帖子' }, { id: 202, title: 'Bob的另一篇帖子' }]
};
return of(posts[userId] || []).pipe(delay(700));
}
}
// 1. 获取所有用户及其详细信息和帖子
ApiService.getUsers().pipe(
mergeMap(users => {
// 为每个用户创建请求数组
const userRequests = users.map(user =>
forkJoin({
details: ApiService.getUserDetails(user.id),
posts: ApiService.getUserPosts(user.id)
}).pipe(
map(data => ({ ...user, ...data }))
)
);
// 并行执行所有用户请求
return forkJoin(userRequests);
})
).subscribe({
next: completeUsers => {
console.log('完整用户数据:', completeUsers);
/* 输出:
[
{
id: 1,
name: 'Alice',
details: { age: 25, email: 'alice@example.com' },
posts: [{ id: 101, title: 'Alice的第一篇帖子' }]
},
{
id: 2,
name: 'Bob',
details: { age: 30, email: 'bob@example.com' },
posts: [{...}, {...}]
}
]
*/
},
error: err => console.error('获取用户数据失败:', err)
});
// 2. 实际应用:表单提交后并行更新多个资源
function updateResources(userData, postsData, settingsData) {
return forkJoin({
user: from(fetch('/api/user', {
method: 'PUT',
body: JSON.stringify(userData)
})),
posts: from(fetch('/api/posts', {
method: 'POST',
body: JSON.stringify(postsData)
})),
settings: from(fetch('/api/settings', {
method: 'PATCH',
body: JSON.stringify(settingsData)
}))
}).pipe(
map(responses => ({
user: responses.user.json(),
posts: responses.posts.json(),
settings: responses.settings.json()
}))
);
}
// 使用示例
updateResources(
{ name: '新名字' },
[{ title: '新帖子' }],
{ theme: 'dark' }
).subscribe({
next: results => {
console.log('所有资源更新成功:', results);
},
error: err => {
console.error('更新失败:', err);
// 显示错误提示
}
});
这些示例展示了 RxJS 操作符在实际开发中的典型用法。关键点:
Observable
是基础,代表数据流of
用于创建简单的同步流from
用于从各种数据源创建流forkJoin
用于并行执行多个 Observable 并合并结果