Skip to content

fsspec安装与使用

fsspec

介绍

官方文档

fsspec (Filesystem Spec): 一个统一的文件系统接口库。它允许你使用相同的 API 来操作本地文件、S3 对象、HDFS 文件等。其核心优势之一就是对异步操作的原生支持。

fsspec 是目前 Python 生态中统一访问各种存储系统的事实标准,由 Dask 团队主导开发,被 pandas、xarray、intake、prefect 等广泛采用。

存储类型协议前缀说明
本地磁盘file:// 或直接路径默认支持
FTPftp://通过 fsspec[ftp]
SFTPsftp://需要 paramiko
HTTP/HTTPShttp://, https://只读
S3(AWS/MinIO)s3://s3fs(基于 fsspec)
Azure Blobaz://adlfs
Google Cloud Storagegcs://gcsfs
WebDAVwebdav://社区实现(如 webdav4 + fsspec wrapper)
内存memory://用于测试
ZIP/TARzip://, tar://虚拟文件系统

安装与配置

安装

shell
# 安装 fsspec, s3fs 用于 S3/MinIO
pip install fsspec s3fs

# 示例:如果你要操作 S3,需要安装 s3fs
# pip install s3fs

# 示例:如果你要操作 HTTP/HTTPS,需要安装 aiohttp
# pip install aiohttp

异步

fsspec 的 LocalFileSystem 不支持原生异步(即没有 _xxx 协程方法),但在 FastAPI 等异步框架中仍可通过线程池实现“伪异步”,避免阻塞事件循环。

  • 在 Python 中,本地文件(local file)的 I/O 本质上是阻塞的,因为操作系统层面的 read() / write() 系统调用是同步的。
  • 虽然不能“真异步”,但可以将本地文件 I/O 放到线程池中执行,避免阻塞 asyncio 事件循环。
    • run_in_executor 默认使用 ThreadPoolExecutor
    • 这样主线程(事件循环)不会被阻塞,可同时处理其他请求
python
from fastapi import FastAPI
import fsspec
import asyncio

app = FastAPI()

@app.get("/read-local")
async def read_local_file():
    loop = asyncio.get_running_loop()
    
    # 将同步的 fsspec 操作放到线程池中运行
    def _sync_read():
        fs = fsspec.filesystem("file")
        with fs.open("/path/to/file.txt", "r") as f:
            return f.read()
    
    content = await loop.run_in_executor(None, _sync_read)
    return {"content": content}

批量读取

批量读取多个本地文件?依然可用线程池并发。虽然不是“异步 I/O”,但通过多线程并发,也能提升吞吐量(尤其在多核 CPU 上)。

python
@app.get("/read-multiple")
async def read_multiple_files():
    paths = ["/a.txt", "/b.txt", "/c.txt"]
    
    def _read_file(path):
        with fsspec.open(path, "r") as f:
            return f.read()
    
    loop = asyncio.get_running_loop()
    tasks = [loop.run_in_executor(None, _read_file, p) for p in paths]
    contents = await asyncio.gather(*tasks)
    
    return dict(zip(paths, contents))