外观
fsspec安装与使用
fsspec
介绍
fsspec (Filesystem Spec): 一个统一的文件系统接口库。它允许你使用相同的 API 来操作本地文件、S3 对象、HDFS 文件等。其核心优势之一就是对异步操作的原生支持。
fsspec 是目前 Python 生态中统一访问各种存储系统的事实标准,由 Dask 团队主导开发,被 pandas、xarray、intake、prefect 等广泛采用。
| 存储类型 | 协议前缀 | 说明 |
|---|---|---|
| 本地磁盘 | file:// 或直接路径 | 默认支持 |
| FTP | ftp:// | 通过 fsspec[ftp] |
| SFTP | sftp:// | 需要 paramiko |
| HTTP/HTTPS | http://, https:// | 只读 |
| S3(AWS/MinIO) | s3:// | 需 s3fs(基于 fsspec) |
| Azure Blob | az:// | 需 adlfs |
| Google Cloud Storage | gcs:// | 需 gcsfs |
| WebDAV | webdav:// | 社区实现(如 webdav4 + fsspec wrapper) |
| 内存 | memory:// | 用于测试 |
| ZIP/TAR | zip://, tar:// | 虚拟文件系统 |
安装与配置
安装
shell
# 安装 fsspec, s3fs 用于 S3/MinIO
pip install fsspec s3fs
# 示例:如果你要操作 S3,需要安装 s3fs
# pip install s3fs
# 示例:如果你要操作 HTTP/HTTPS,需要安装 aiohttp
# pip install aiohttp异步
fsspec 的 LocalFileSystem 不支持原生异步(即没有 _xxx 协程方法),但在 FastAPI 等异步框架中仍可通过线程池实现“伪异步”,避免阻塞事件循环。
- 在 Python 中,本地文件(local file)的 I/O 本质上是阻塞的,因为操作系统层面的
read()/write()系统调用是同步的。 - 虽然不能“真异步”,但可以将本地文件 I/O 放到线程池中执行,避免阻塞 asyncio 事件循环。
run_in_executor默认使用ThreadPoolExecutor- 这样主线程(事件循环)不会被阻塞,可同时处理其他请求
python
from fastapi import FastAPI
import fsspec
import asyncio
app = FastAPI()
@app.get("/read-local")
async def read_local_file():
loop = asyncio.get_running_loop()
# 将同步的 fsspec 操作放到线程池中运行
def _sync_read():
fs = fsspec.filesystem("file")
with fs.open("/path/to/file.txt", "r") as f:
return f.read()
content = await loop.run_in_executor(None, _sync_read)
return {"content": content}批量读取
批量读取多个本地文件?依然可用线程池并发。虽然不是“异步 I/O”,但通过多线程并发,也能提升吞吐量(尤其在多核 CPU 上)。
python
@app.get("/read-multiple")
async def read_multiple_files():
paths = ["/a.txt", "/b.txt", "/c.txt"]
def _read_file(path):
with fsspec.open(path, "r") as f:
return f.read()
loop = asyncio.get_running_loop()
tasks = [loop.run_in_executor(None, _read_file, p) for p in paths]
contents = await asyncio.gather(*tasks)
return dict(zip(paths, contents))