柚木

Node.js 代码阅读笔记系列(2)fs.readFile() 的实现

那就先从 fs.readFie 开始

fs.readFile 是怎么工作的?

fs.readFile() 接收 3 个传参,分别是 path, options, callback。通过下面的代码可以看到,其中的 options 是一个可选的参数,callback 始终是取最后一个参数。path 支持路径字符或者文件标识符。

fs.readFile = function(path, options, callback) {
  // 接收最后一个参数作为传参
  callback = maybeCallback(arguments[arguments.length - 1]);
  // 初始化 options
  options = getOptions(options, { flag: 'r' });
	
	// getPathFromURL 
	
  if (handleError((path = getPathFromURL(path)), callback))
    return;
    // path 和 callback 校验, path 中不能存在 `\u0000`
  if (!nullCheck(path, callback))
    return;
	
	// 创建文件读取上下文实例
  var context = new ReadFileContext(callback, options.encoding);
  // 判断 path 是否是文件描述符
  context.isUserFd = isFd(path); 
  // 创建一个文件读取实例
  var req = new FSReqWrap();
  req.context = context;
  
  // 设置文件读取 after open 回调
  req.oncomplete = readFileAfterOpen;
	
	// path是文件描述符,直接在nextTick执行open的回调
  if (context.isUserFd) {
    process.nextTick(function() {
      req.oncomplete(null, path);
    });
    return;
  }
	
	// 使用 `path` 模块对路径参数 makeLong 处理
	// 调用 fs 模块的 open
  binding.open(pathModule._makeLong(path),
               stringToFlags(options.flag || 'r'),
               0o666,
               req);
};

readFileAfterOpen

这里 readFileAfterOpen() 是一个通用的回调参数, 主要进行 open 操作之后的异常处理以及调用下一步的 stat

function readFileAfterOpen (err, fd) {
  var context = this.context;
	// open 失败,执行回调
  if (err) {
    context.callback(err);
    return;
  }
	
	// 给上下文对象赋值文件标识符
  context.fd = fd;
	
	// 创建一个新的文件请求
  var req = new FSReqWrap();
  req.oncomplete = readFileAfterStat;
  req.context = context;
  binding.fstat(fd, req);
}

ReadFileContext

ReadFileContext() 是读取文件的上下文的构造器,它的实例会有当前读取文件的标识符,大小,编码,读取的位置等关键的属性。

ReadFileContext() 还有两个原型方法 read() 和 close()。

ReadFileContext.prototype.read = function() {
  ...
  var req = new FSReqWrap();
  req.oncomplete = readFileAfterRead;
  req.context = this;

  binding.read(this.fd, buffer, offset, length, -1, req);
};

ReadFileContext.prototype.close = function(err) {
  var req = new FSReqWrap();
  req.oncomplete = readFileAfterClose;
  req.context = this;
  this.err = err;

  if (this.isUserFd) {
    process.nextTick(function() {
      req.oncomplete(null);
    });
    return;
  }

  binding.close(this.fd, req);
};

readFileAfterRead() 绑定在 FSReqWrap 实例的 oncomplete 回调上,readFileAfterRead 会持续读取文件内容。

FSReqWrap

我们在上面的代码中见到所有涉及文件操作的回调的地方都看到了 FSReqWrap 的身影。下面我们来看看 FSReqWrap 是怎么实现的。

/src/node_file.cc

FSReqWrap 继承自 ReqWrap, ReqWrap 和上一篇文章提到的 HandleWrap 都是继承自 AsyncWrap。

class FSReqWrap: public ReqWrap<uv_fs_t> {
 public:
  enum Ownership { COPY, MOVE };

  inline static FSReqWrap* New(Environment* env,
                               Local<Object> req,
                               const char* syscall,
                               const char* data = nullptr,
                               enum encoding encoding = UTF8,
                               Ownership ownership = COPY);

  inline void Dispose();

  ...
};

我们回到最开始 JavaScript 部分,通过赋值req.oncomplete 实现的设置回调。那么 oncomplete() 是在上面时候执行的?

var req = new FSReqWrap();
req.context = context;
req.oncomplete = readFileAfterOpen;

Read

Read() 就是 process.binding('fs').read() 的实现, 这个实现是对read(2)的一个包装。 看到 Read() 的最后的 ASYNC_CALL() 和 SYNC_CALL(),差不多能得出结论,也就是实现 fs.readFile() 和 fs.readFileSync() 等同步和异步 文件系统API的实现基础。

static void Read(const FunctionCallbackInfo<Value>& args) {
  Environment* env = Environment::GetCurrent(args);
  
  ...
  
  uv_buf_t uvbuf = uv_buf_init(const_cast<char*>(buf), len);
  
  req = args[5];

  if (req->IsObject()) {
    ASYNC_CALL(read, req, UTF8, fd, &uvbuf, 1, pos);
  } else {
    SYNC_CALL(read, 0, fd, &uvbuf, 1, pos)
    args.GetReturnValue().Set(SYNC_RESULT);
  }
}

uv_buf_t 是用于保存数据的单元,它被抽象成了 buffer 结构,只保存了指向真实数据的指针(uv_buf_t.base) 以及真实数据的长度 (uv_buf_t.len)

ASYNC_CALL

#define ASYNC_CALL(func, req, encoding, ...)                                  \
  ASYNC_DEST_CALL(func, req, nullptr, encoding, __VA_ARGS__)                  \

ASYNC_DEST_CALL

#define ASYNC_DEST_CALL(func, request, dest, encoding, ...)                   \
  Environment* env = Environment::GetCurrent(args);                           \
  CHECK(request->IsObject());                                                 \
  FSReqWrap* req_wrap = FSReqWrap::New(env, request.As<Object>(),             \
                                       #func, dest, encoding);                \
  int err = uv_fs_ ## func(env->event_loop(),                                 \
                           req_wrap->req(),                                   \
                           __VA_ARGS__,                                       \
                           After);                                            \
  req_wrap->Dispatched();                                                     \
  if (err < 0) {                                                              \
    uv_fs_t* uv_req = req_wrap->req();                                        \
    uv_req->result = err;                                                     \
    uv_req->path = nullptr;                                                   \
    After(uv_req);                                                            \
    req_wrap = nullptr;                                                       \
  } else {                                                                    \
    args.GetReturnValue().Set(req_wrap->persistent());                        \
  }

文件读写是通过 uv_fs_* 函数族和 uv_fs_t 结构体完成的。uv_fs_t 的 result 域保存了 uv_fs_open 回调函数打开的文件描述符。如果文件被正确地打开,我们可以开始读取了。

After

static void After(uv_fs_t *req) {
  FSReqWrap* req_wrap = static_cast<FSReqWrap*>(req->data);
  CHECK_EQ(req_wrap->req(), req);
  req_wrap->ReleaseEarly();  // Free memory that's no longer used now.

  ...
  
  // 执行回调
  req_wrap->MakeCallback(env->oncomplete_string(), argc, argv);
	
  uv_fs_req_cleanup(req_wrap->req());
  req_wrap->Dispose();

函数 uv_fs_req_cleanup() 在文件系统操作结束后必须要被调用,用来回收在读写中分配的内存。

uv_fs_read

POST 是一个宏定义,他处理异步回调任务和同步任务。判断有异步回调的话,调用uv__work_submit() 将异步请求推入线程池。线程池最大数量限制是 128。

// deps/uv/src/unix/fs.c

int uv_fs_read(uv_loop_t* loop, uv_fs_t* req,
               uv_file file,
               const uv_buf_t bufs[],
               unsigned int nbufs,
               int64_t off,
               uv_fs_cb cb) {
  if (bufs == NULL || nbufs == 0)
    return -EINVAL;

  INIT(READ);
  req->file = file;

  req->nbufs = nbufs;
  req->bufs = req->bufsml;
  if (nbufs > ARRAY_SIZE(req->bufsml))
    req->bufs = uv__malloc(nbufs * sizeof(*bufs));

  if (req->bufs == NULL) {
    if (cb != NULL)
      uv__req_unregister(loop, req);
    return -ENOMEM;
  }

  memcpy(req->bufs, bufs, nbufs * sizeof(*bufs));

  req->off = off;
  POST;
}

#define POST                                                                  \
  do {                                                                        \
    if (cb != NULL) {                                                         \
      uv__work_submit(loop, &req->work_req, uv__fs_work, uv__fs_done);        \
      return 0;                                                               \
    }                                                                         \
    else {                                                                    \
      uv__fs_work(&req->work_req);                                            \
      return req->result;                                                     \
    }                                                                         \
  }                                                                           \
  while (0)

uv__work_submit

uv__work_submit() 接收 eventloop 结构体,uv__work 结构体以及uv__fs_work,uv__fs_done 函数。

//  deps/uv/src/threadpool.c

void uv__work_submit(uv_loop_t* loop,
                     struct uv__work* w,
                     void (*work)(struct uv__work* w),
                     void (*done)(struct uv__work* w, int status)) {
  uv_once(&once, init_once);
  w->loop = loop;
  w->work = work;
  w->done = done;
  post(&w->wq);
}

static void post(QUEUE* q) {
  // 添加互斥锁
  uv_mutex_lock(&mutex);
  // 插入链表队尾
  QUEUE_INSERT_TAIL(&wq, q);
  if (idle_threads > 0)
    uv_cond_signal(&cond);
  // unlock
  uv_mutex_unlock(&mutex);
}

worker

worker 顺序取出队列的第一个任务,并执行 w->work(w),最后调用 uv_async_send() 。直至 exit_message 时退出。

static void worker(void* arg) {
  struct uv__work* w;
  QUEUE* q;

  (void) arg;

  for (;;) {
    uv_mutex_lock(&mutex);

    while (QUEUE_EMPTY(&wq)) {
      idle_threads += 1;
      uv_cond_wait(&cond, &mutex);
      idle_threads -= 1;
    }

    q = QUEUE_HEAD(&wq);

    if (q == &exit_message)
      uv_cond_signal(&cond);
    else {
      QUEUE_REMOVE(q);
      QUEUE_INIT(q);  /* Signal uv_cancel() that the work req is
                             executing. */
    }

    uv_mutex_unlock(&mutex);

    if (q == &exit_message)
      break;

    w = QUEUE_DATA(q, struct uv__work, wq);
    w->work(w);

    uv_mutex_lock(&w->loop->wq_mutex);
    w->work = NULL;  /* Signal uv_cancel() that the work req is done
                        executing. */
    QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
    uv_async_send(&w->loop->wq_async);
    uv_mutex_unlock(&w->loop->wq_mutex);
  }
}

uv__async_send

uv_async_send() 的作用是通知 io_watcher 执行相应线程上的回调。

// deps/uv/src/unix/async.c
void uv__async_send(struct uv__async* wa) {
  const void* buf;
  ssize_t len;
  int fd;
  int r;

  buf = "";
  len = 1;
  fd = wa->wfd;

#if defined(__linux__)
  if (fd == -1) {
    static const uint64_t val = 1;
    buf = &val;
    len = sizeof(val);
    fd = wa->io_watcher.fd;  /* eventfd */
  }
#endif

  do
    r = write(fd, buf, len);
  while (r == -1 && errno == EINTR);

  if (r == len)
    return;

  if (r == -1)
    if (errno == EAGAIN || errno == EWOULDBLOCK)
      return;

  abort();
}

总结

我们通过以上的源码,从 fs.readFile() 深入到了 libuv 的 uv_fs* 函数以及 uv_async_send()。

虽然我们在写 Node.js 时是在一条主线程中, 我们不需要考虑变量的共享以及锁的问题。但当我们处理异步 IO 操作中,背后是多个线程处理异步 IO。