实现查询逻辑

再来重温一下之前提到的查询的算法,查询时从根节点开始,通过二分查找找到所属的节点或者通往子树的指针,重复这个过程直到抵达最后的叶子节点,然后继续二分查找看是否存在匹配的 Cell。整体来看还是比较容易实现的,首先为 Database 类实现一些初始化的逻辑和查询的接口:

export class Database {
  private readonly pager: Pager;
  private btree: BTree | null = null;

  constructor(filePath: string) {
    const isExist = fs.existsSync(filePath);
    const fd = fs.openSync(filePath, isExist ? 'r+' : 'w+');
    this.pager = new Pager(fd);
  }

  public open() {
    if (!this.pager.verifyFileHeader()) {
      throw new Error('This is not a simple db file!');
    }
    const cursor = new Cursor(this.pager);
    this.btree = new BTree(this.pager, cursor);
  }

  public get(key: string) {
    if (!this.btree) {
      throw new Error('call `db.open` first!');
    }
    const buf = this.btree.find(Buffer.from(key));
    return buf ? deserialize(buf) : null;
  }
}
export class Database {
  private readonly pager: Pager;
  private btree: BTree | null = null;

  constructor(filePath: string) {
    const isExist = fs.existsSync(filePath);
    const fd = fs.openSync(filePath, isExist ? 'r+' : 'w+');
    this.pager = new Pager(fd);
  }

  public open() {
    if (!this.pager.verifyFileHeader()) {
      throw new Error('This is not a simple db file!');
    }
    const cursor = new Cursor(this.pager);
    this.btree = new BTree(this.pager, cursor);
  }

  public get(key: string) {
    if (!this.btree) {
      throw new Error('call `db.open` first!');
    }
    const buf = this.btree.find(Buffer.from(key));
    return buf ? deserialize(buf) : null;
  }
}

然后是 BTreefind 方法,我们先通过 Cursor 来查询到 key 所应该处于的叶子节点,然后在这个叶子节点里搜索是否存在匹配的 KeyValueCell

export class BTreeNode {
  //...

  public find(key: Buffer): Buffer | null {
    if (this.root) {
      this.cursor.reset();
      const node = this.cursor.findLeafNodeByKey(this.root, key);
      return node.findKeyValueCell(key)?.value ?? null;
    }
    return null;
  }
}
export class BTreeNode {
  //...

  public find(key: Buffer): Buffer | null {
    if (this.root) {
      this.cursor.reset();
      const node = this.cursor.findLeafNodeByKey(this.root, key);
      return node.findKeyValueCell(key)?.value ?? null;
    }
    return null;
  }
}

再来仔细看看 findLeafNodeByKey 方法,因为涉及到节点内部的数据查询,这里我们也要依赖 BTreeNode 来做具体的实现,因此这里只是看 findSubtreeOrLeaf 返回的究竟是一个节点,还是一个指向子树的指针,如果是指针的话,就继续读取出来重复这个查找的过程。另外 Cursor 肯定得依赖 Pager 来读取磁盘,同时别忘了给它加上面包屑的功能:

export class Cursor {
  private breadcrumbs: number[] = []; // 存储经过的节点 id
  private index: number = -1; // 用一个指针来记录当前的位置
  private readonly pager: Pager;

  // 每次前进到一个节点就存储它的 id 并把指针前进一步
  private addBreadcrumbs(b: number) {
    this.breadcrumbs.push(b);
    this.index++;
  }

  constructor(pager: Pager) {
    this.pager = pager;
  }

  public reset() {
    this.breadcrumbs = [];
    this.index = -1;
  }

  public getRoot(): BTreeNode | null {
    const [id, buf] = this.pager.readRootPage();
    if (!buf) {
      return null;
    }
    return new BTreeNode(id, buf);
  }

  public findLeafNodeByKey(startNode: BTreeNode, key: Buffer) {
    this.addBreadcrumbs(startNode.id);
    // 这里可能返回一个节点,也可能返回一个指向子树的指针
    let nodeOrPointer = startNode.findSubtreeOrLeaf(key);

    while (typeof nodeOrPointer === 'number') {
      const buf = this.pager.readPageById(nodeOrPointer);
      startNode = new BTreeNode(nodeOrPointer, buf);
      this.addBreadcrumbs(startNode.id);
      nodeOrPointer = startNode.findSubtreeOrLeaf(key);
    }

    return nodeOrPointer;
  }
}
export class Cursor {
  private breadcrumbs: number[] = []; // 存储经过的节点 id
  private index: number = -1; // 用一个指针来记录当前的位置
  private readonly pager: Pager;

  // 每次前进到一个节点就存储它的 id 并把指针前进一步
  private addBreadcrumbs(b: number) {
    this.breadcrumbs.push(b);
    this.index++;
  }

  constructor(pager: Pager) {
    this.pager = pager;
  }

  public reset() {
    this.breadcrumbs = [];
    this.index = -1;
  }

  public getRoot(): BTreeNode | null {
    const [id, buf] = this.pager.readRootPage();
    if (!buf) {
      return null;
    }
    return new BTreeNode(id, buf);
  }

  public findLeafNodeByKey(startNode: BTreeNode, key: Buffer) {
    this.addBreadcrumbs(startNode.id);
    // 这里可能返回一个节点,也可能返回一个指向子树的指针
    let nodeOrPointer = startNode.findSubtreeOrLeaf(key);

    while (typeof nodeOrPointer === 'number') {
      const buf = this.pager.readPageById(nodeOrPointer);
      startNode = new BTreeNode(nodeOrPointer, buf);
      this.addBreadcrumbs(startNode.id);
      nodeOrPointer = startNode.findSubtreeOrLeaf(key);
    }

    return nodeOrPointer;
  }
}

BTreeNode 中的 findSubtreeOrLeaf 则是不断进行对键进行二分查找,还记得我们之前定义的 cell 的分布吗,首先是有序的 cell 指针数组,而 cell 按照写入顺序逆序从页的最后开始往前写入,因此二分查找或者读取都要先通过 cell 的指针来进行。

二分查找的过程还是很简单的,定义左右两个指针,然后不断进行移动逐渐互相逼近:

export function findIndexOfFirstGreatorElement<T, K>(
  array: T[],
  target: K,
  comparator: (
    current: T,
    target: K,
    currentIndex?: number,
    arr?: T[]
  ) => number
): number {
  let start = 0;
  let end = array.length - 1;
  let index = -1;

  while (start <= end) {
    let mid = Math.floor((start + end) / 2);

    const cmp = comparator(array[mid], target, mid, array);

    // move to right side if target is greater.
    if (cmp <= 0) {
      start = mid + 1;
    } else {
      // move left side.
      index = mid;
      end = mid - 1;
    }
  }

  return index;
}
export function findIndexOfFirstGreatorElement<T, K>(
  array: T[],
  target: K,
  comparator: (
    current: T,
    target: K,
    currentIndex?: number,
    arr?: T[]
  ) => number
): number {
  let start = 0;
  let end = array.length - 1;
  let index = -1;

  while (start <= end) {
    let mid = Math.floor((start + end) / 2);

    const cmp = comparator(array[mid], target, mid, array);

    // move to right side if target is greater.
    if (cmp <= 0) {
      start = mid + 1;
    } else {
      // move left side.
      index = mid;
      end = mid - 1;
    }
  }

  return index;
}

为了方便,上面使用了简单的泛型,方便进行类型的推导。

接下来就可以实现 BTreeNode 中关键的 findSubtreeOrLeaf 方法了,这也是一个非常核心的方法,在后面的写入逻辑中时,我们也需要先定位到 key 应该放进去哪一个节点,然后再决定要不要进行分裂:

export class BTreeNode {
  // 一些跟节点有关的常量可以放在静态属性里
  public static readonly HEADER_SIZE = 8;
  public static readonly DEFAULT_FREE_START = 8;
  public static readonly DEFAULT_CELL_START = PAGE_SIZE;
  public static readonly CELL_AREA_END = PAGE_SIZE;
  public static readonly CELL_POINTER_SIZE = 2;

  //...

  // 构造一个快速获取 cell 指针数组的 getter 方法
  private get cellOffsets(): number[] {
    let i = BTreeNode.HEADER_SIZE;
    const buf = this.buffer;
    const res = [];
    while (i < this.freeStart) {
      const offset = buf.readInt16BE(i);
      res.push(offset);
      i += BTreeNode.CELL_POINTER_SIZE;
    }
    return res;
  }

  // 以及对应的 setter 方法
  private set cellOffsets(offsets: number[]) {
    const cellPointers = Buffer.concat(
      offsets.map(val => {
        const buf = Buffer.alloc(BTreeNode.CELL_POINTER_SIZE);
        buf.writeInt16BE(val);
        return buf;
      })
    );
    cellPointers.copy(this.buffer, BTreeNode.HEADER_SIZE);
  }

  // 如果是中间节点就进行二分查找,否则就返回该节点
  public findSubtreeOrLeaf(key: Buffer): BTreeNode | number {
    if (this.isInternalNode()) {
      const currentCellOffsets = this.cellOffsets;

      // 先通过二分查找到第一个比 key 大的 cell 的索引
      const index = findIndexOfFirstGreatorElement(
        currentCellOffsets,
        key,
        (a, b) => Buffer.compare(this.readCellByPointer(a)!.key, b)
      );

      let cell: PointerCell;
      if (index === -1) {
        // the key is greator than or equal to last element
        cell = this.readCellByIndex(-1)! as PointerCell;
      } else if (index === 0) {
        // the key is lesser the first element
        cell = this.readCellByIndex(0)! as PointerCell;
      } else {
        // the key is lesser than element at index, so we return the previous one of index
        cell = this.readCellByIndex(index - 1)! as PointerCell;
      }
      return cell.childPageId;
    } else if (this.isEmptyNode()) {
      // is newly created root node
      return this;
    } else {
      // is leaf node
      return this;
    }
  }
}
export class BTreeNode {
  // 一些跟节点有关的常量可以放在静态属性里
  public static readonly HEADER_SIZE = 8;
  public static readonly DEFAULT_FREE_START = 8;
  public static readonly DEFAULT_CELL_START = PAGE_SIZE;
  public static readonly CELL_AREA_END = PAGE_SIZE;
  public static readonly CELL_POINTER_SIZE = 2;

  //...

  // 构造一个快速获取 cell 指针数组的 getter 方法
  private get cellOffsets(): number[] {
    let i = BTreeNode.HEADER_SIZE;
    const buf = this.buffer;
    const res = [];
    while (i < this.freeStart) {
      const offset = buf.readInt16BE(i);
      res.push(offset);
      i += BTreeNode.CELL_POINTER_SIZE;
    }
    return res;
  }

  // 以及对应的 setter 方法
  private set cellOffsets(offsets: number[]) {
    const cellPointers = Buffer.concat(
      offsets.map(val => {
        const buf = Buffer.alloc(BTreeNode.CELL_POINTER_SIZE);
        buf.writeInt16BE(val);
        return buf;
      })
    );
    cellPointers.copy(this.buffer, BTreeNode.HEADER_SIZE);
  }

  // 如果是中间节点就进行二分查找,否则就返回该节点
  public findSubtreeOrLeaf(key: Buffer): BTreeNode | number {
    if (this.isInternalNode()) {
      const currentCellOffsets = this.cellOffsets;

      // 先通过二分查找到第一个比 key 大的 cell 的索引
      const index = findIndexOfFirstGreatorElement(
        currentCellOffsets,
        key,
        (a, b) => Buffer.compare(this.readCellByPointer(a)!.key, b)
      );

      let cell: PointerCell;
      if (index === -1) {
        // the key is greator than or equal to last element
        cell = this.readCellByIndex(-1)! as PointerCell;
      } else if (index === 0) {
        // the key is lesser the first element
        cell = this.readCellByIndex(0)! as PointerCell;
      } else {
        // the key is lesser than element at index, so we return the previous one of index
        cell = this.readCellByIndex(index - 1)! as PointerCell;
      }
      return cell.childPageId;
    } else if (this.isEmptyNode()) {
      // is newly created root node
      return this;
    } else {
      // is leaf node
      return this;
    }
  }
}

为了方便读取 cell,实现一下根据指针获取 cell 的 readCellByPointer 方法和根据 index 获取 cell 的 readCellByIndex 方法:

private readCellByIndex(index: number): PointerCell | KeyValueCell | null {
  const ptrs = this.cellOffsets;
  const ptr = ptrs.at(index);
  if (typeof ptr === 'undefined') {
    return null;
  }
  return this.readCellByPointer(ptr);
}

private readCellByPointer(ptr: number): PointerCell | KeyValueCell | null {
  const buf = this.buffer;
  if (this.isInternalNode()) {
    const keySize = buf.readInt32BE(ptr);
    const size = PointerCell.calcSize(keySize);
    const cellBuf = buf.slice(ptr, ptr + size);
    return new PointerCell(cellBuf, ptr);
  } else if (this.isLeafNode()) {
    const keySize = buf.readInt32BE(ptr + 1);
    const valueSize = buf.readInt32BE(ptr + 5);
    const size = KeyValueCell.calcSize(keySize, valueSize);
    const cellBuf = buf.slice(ptr, ptr + size);
    return new KeyValueCell(cellBuf, ptr);
  }

  return null;
}
private readCellByIndex(index: number): PointerCell | KeyValueCell | null {
  const ptrs = this.cellOffsets;
  const ptr = ptrs.at(index);
  if (typeof ptr === 'undefined') {
    return null;
  }
  return this.readCellByPointer(ptr);
}

private readCellByPointer(ptr: number): PointerCell | KeyValueCell | null {
  const buf = this.buffer;
  if (this.isInternalNode()) {
    const keySize = buf.readInt32BE(ptr);
    const size = PointerCell.calcSize(keySize);
    const cellBuf = buf.slice(ptr, ptr + size);
    return new PointerCell(cellBuf, ptr);
  } else if (this.isLeafNode()) {
    const keySize = buf.readInt32BE(ptr + 1);
    const valueSize = buf.readInt32BE(ptr + 5);
    const size = KeyValueCell.calcSize(keySize, valueSize);
    const cellBuf = buf.slice(ptr, ptr + size);
    return new KeyValueCell(cellBuf, ptr);
  }

  return null;
}

基于此还可以增加一些 helper 方法方便快速访问内部保存的键:

public firstKey() {
  return this.readCellByIndex(0)?.key!;
}

public lastKey() {
  const cellOffsets = this.cellOffsets;
  return this.readCellByIndex(cellOffsets.length - 1)?.key!;
}

public keys() {
  return this.cellOffsets.map(p => this.readCellByPointer(p)!.key);
}

public keyAt(n: number) {
  return this.readCellByIndex(n)?.key ?? null;
}

public keyCount(): number {
  return this.cellOffsets.length;
}
public firstKey() {
  return this.readCellByIndex(0)?.key!;
}

public lastKey() {
  const cellOffsets = this.cellOffsets;
  return this.readCellByIndex(cellOffsets.length - 1)?.key!;
}

public keys() {
  return this.cellOffsets.map(p => this.readCellByPointer(p)!.key);
}

public keyAt(n: number) {
  return this.readCellByIndex(n)?.key ?? null;
}

public keyCount(): number {
  return this.cellOffsets.length;
}

如此,就已经实现了完整的写入逻辑了!接下来我们可以编写一个简单的测试,同时也提供了一个环境来方便我们后续验证最复杂的写入逻辑。