实现查询逻辑
再来重温一下之前提到的查询的算法,查询时从根节点开始,通过二分查找找到所属的节点或者通往子树的指针,重复这个过程直到抵达最后的叶子节点,然后继续二分查找看是否存在匹配的 Cell。整体来看还是比较容易实现的,首先为 Database
类实现一些初始化的逻辑和查询的接口:
export class Database {
private readonly pager: Pager;
private btree: BTree | null = null;
constructor(filePath: string) {
const isExist = fs.existsSync(filePath);
const fd = fs.openSync(filePath, isExist ? 'r+' : 'w+');
this.pager = new Pager(fd);
}
public open() {
if (!this.pager.verifyFileHeader()) {
throw new Error('This is not a simple db file!');
}
const cursor = new Cursor(this.pager);
this.btree = new BTree(this.pager, cursor);
}
public get(key: string) {
if (!this.btree) {
throw new Error('call `db.open` first!');
}
const buf = this.btree.find(Buffer.from(key));
return buf ? deserialize(buf) : null;
}
}
export class Database {
private readonly pager: Pager;
private btree: BTree | null = null;
constructor(filePath: string) {
const isExist = fs.existsSync(filePath);
const fd = fs.openSync(filePath, isExist ? 'r+' : 'w+');
this.pager = new Pager(fd);
}
public open() {
if (!this.pager.verifyFileHeader()) {
throw new Error('This is not a simple db file!');
}
const cursor = new Cursor(this.pager);
this.btree = new BTree(this.pager, cursor);
}
public get(key: string) {
if (!this.btree) {
throw new Error('call `db.open` first!');
}
const buf = this.btree.find(Buffer.from(key));
return buf ? deserialize(buf) : null;
}
}
然后是 BTree
的 find
方法,我们先通过 Cursor
来查询到 key 所应该处于的叶子节点,然后在这个叶子节点里搜索是否存在匹配的 KeyValueCell
:
export class BTreeNode {
//...
public find(key: Buffer): Buffer | null {
if (this.root) {
this.cursor.reset();
const node = this.cursor.findLeafNodeByKey(this.root, key);
return node.findKeyValueCell(key)?.value ?? null;
}
return null;
}
}
export class BTreeNode {
//...
public find(key: Buffer): Buffer | null {
if (this.root) {
this.cursor.reset();
const node = this.cursor.findLeafNodeByKey(this.root, key);
return node.findKeyValueCell(key)?.value ?? null;
}
return null;
}
}
再来仔细看看 findLeafNodeByKey
方法,因为涉及到节点内部的数据查询,这里我们也要依赖 BTreeNode
来做具体的实现,因此这里只是看 findSubtreeOrLeaf
返回的究竟是一个节点,还是一个指向子树的指针,如果是指针的话,就继续读取出来重复这个查找的过程。另外 Cursor 肯定得依赖 Pager 来读取磁盘,同时别忘了给它加上面包屑的功能:
export class Cursor {
private breadcrumbs: number[] = []; // 存储经过的节点 id
private index: number = -1; // 用一个指针来记录当前的位置
private readonly pager: Pager;
// 每次前进到一个节点就存储它的 id 并把指针前进一步
private addBreadcrumbs(b: number) {
this.breadcrumbs.push(b);
this.index++;
}
constructor(pager: Pager) {
this.pager = pager;
}
public reset() {
this.breadcrumbs = [];
this.index = -1;
}
public getRoot(): BTreeNode | null {
const [id, buf] = this.pager.readRootPage();
if (!buf) {
return null;
}
return new BTreeNode(id, buf);
}
public findLeafNodeByKey(startNode: BTreeNode, key: Buffer) {
this.addBreadcrumbs(startNode.id);
// 这里可能返回一个节点,也可能返回一个指向子树的指针
let nodeOrPointer = startNode.findSubtreeOrLeaf(key);
while (typeof nodeOrPointer === 'number') {
const buf = this.pager.readPageById(nodeOrPointer);
startNode = new BTreeNode(nodeOrPointer, buf);
this.addBreadcrumbs(startNode.id);
nodeOrPointer = startNode.findSubtreeOrLeaf(key);
}
return nodeOrPointer;
}
}
export class Cursor {
private breadcrumbs: number[] = []; // 存储经过的节点 id
private index: number = -1; // 用一个指针来记录当前的位置
private readonly pager: Pager;
// 每次前进到一个节点就存储它的 id 并把指针前进一步
private addBreadcrumbs(b: number) {
this.breadcrumbs.push(b);
this.index++;
}
constructor(pager: Pager) {
this.pager = pager;
}
public reset() {
this.breadcrumbs = [];
this.index = -1;
}
public getRoot(): BTreeNode | null {
const [id, buf] = this.pager.readRootPage();
if (!buf) {
return null;
}
return new BTreeNode(id, buf);
}
public findLeafNodeByKey(startNode: BTreeNode, key: Buffer) {
this.addBreadcrumbs(startNode.id);
// 这里可能返回一个节点,也可能返回一个指向子树的指针
let nodeOrPointer = startNode.findSubtreeOrLeaf(key);
while (typeof nodeOrPointer === 'number') {
const buf = this.pager.readPageById(nodeOrPointer);
startNode = new BTreeNode(nodeOrPointer, buf);
this.addBreadcrumbs(startNode.id);
nodeOrPointer = startNode.findSubtreeOrLeaf(key);
}
return nodeOrPointer;
}
}
BTreeNode
中的 findSubtreeOrLeaf
则是不断进行对键进行二分查找,还记得我们之前定义的 cell 的分布吗,首先是有序的 cell 指针数组,而 cell 按照写入顺序逆序从页的最后开始往前写入,因此二分查找或者读取都要先通过 cell 的指针来进行。
二分查找的过程还是很简单的,定义左右两个指针,然后不断进行移动逐渐互相逼近:
export function findIndexOfFirstGreatorElement<T, K>(
array: T[],
target: K,
comparator: (
current: T,
target: K,
currentIndex?: number,
arr?: T[]
) => number
): number {
let start = 0;
let end = array.length - 1;
let index = -1;
while (start <= end) {
let mid = Math.floor((start + end) / 2);
const cmp = comparator(array[mid], target, mid, array);
// move to right side if target is greater.
if (cmp <= 0) {
start = mid + 1;
} else {
// move left side.
index = mid;
end = mid - 1;
}
}
return index;
}
export function findIndexOfFirstGreatorElement<T, K>(
array: T[],
target: K,
comparator: (
current: T,
target: K,
currentIndex?: number,
arr?: T[]
) => number
): number {
let start = 0;
let end = array.length - 1;
let index = -1;
while (start <= end) {
let mid = Math.floor((start + end) / 2);
const cmp = comparator(array[mid], target, mid, array);
// move to right side if target is greater.
if (cmp <= 0) {
start = mid + 1;
} else {
// move left side.
index = mid;
end = mid - 1;
}
}
return index;
}
为了方便,上面使用了简单的泛型,方便进行类型的推导。
接下来就可以实现 BTreeNode
中关键的 findSubtreeOrLeaf
方法了,这也是一个非常核心的方法,在后面的写入逻辑中时,我们也需要先定位到 key 应该放进去哪一个节点,然后再决定要不要进行分裂:
export class BTreeNode {
// 一些跟节点有关的常量可以放在静态属性里
public static readonly HEADER_SIZE = 8;
public static readonly DEFAULT_FREE_START = 8;
public static readonly DEFAULT_CELL_START = PAGE_SIZE;
public static readonly CELL_AREA_END = PAGE_SIZE;
public static readonly CELL_POINTER_SIZE = 2;
//...
// 构造一个快速获取 cell 指针数组的 getter 方法
private get cellOffsets(): number[] {
let i = BTreeNode.HEADER_SIZE;
const buf = this.buffer;
const res = [];
while (i < this.freeStart) {
const offset = buf.readInt16BE(i);
res.push(offset);
i += BTreeNode.CELL_POINTER_SIZE;
}
return res;
}
// 以及对应的 setter 方法
private set cellOffsets(offsets: number[]) {
const cellPointers = Buffer.concat(
offsets.map(val => {
const buf = Buffer.alloc(BTreeNode.CELL_POINTER_SIZE);
buf.writeInt16BE(val);
return buf;
})
);
cellPointers.copy(this.buffer, BTreeNode.HEADER_SIZE);
}
// 如果是中间节点就进行二分查找,否则就返回该节点
public findSubtreeOrLeaf(key: Buffer): BTreeNode | number {
if (this.isInternalNode()) {
const currentCellOffsets = this.cellOffsets;
// 先通过二分查找到第一个比 key 大的 cell 的索引
const index = findIndexOfFirstGreatorElement(
currentCellOffsets,
key,
(a, b) => Buffer.compare(this.readCellByPointer(a)!.key, b)
);
let cell: PointerCell;
if (index === -1) {
// the key is greator than or equal to last element
cell = this.readCellByIndex(-1)! as PointerCell;
} else if (index === 0) {
// the key is lesser the first element
cell = this.readCellByIndex(0)! as PointerCell;
} else {
// the key is lesser than element at index, so we return the previous one of index
cell = this.readCellByIndex(index - 1)! as PointerCell;
}
return cell.childPageId;
} else if (this.isEmptyNode()) {
// is newly created root node
return this;
} else {
// is leaf node
return this;
}
}
}
export class BTreeNode {
// 一些跟节点有关的常量可以放在静态属性里
public static readonly HEADER_SIZE = 8;
public static readonly DEFAULT_FREE_START = 8;
public static readonly DEFAULT_CELL_START = PAGE_SIZE;
public static readonly CELL_AREA_END = PAGE_SIZE;
public static readonly CELL_POINTER_SIZE = 2;
//...
// 构造一个快速获取 cell 指针数组的 getter 方法
private get cellOffsets(): number[] {
let i = BTreeNode.HEADER_SIZE;
const buf = this.buffer;
const res = [];
while (i < this.freeStart) {
const offset = buf.readInt16BE(i);
res.push(offset);
i += BTreeNode.CELL_POINTER_SIZE;
}
return res;
}
// 以及对应的 setter 方法
private set cellOffsets(offsets: number[]) {
const cellPointers = Buffer.concat(
offsets.map(val => {
const buf = Buffer.alloc(BTreeNode.CELL_POINTER_SIZE);
buf.writeInt16BE(val);
return buf;
})
);
cellPointers.copy(this.buffer, BTreeNode.HEADER_SIZE);
}
// 如果是中间节点就进行二分查找,否则就返回该节点
public findSubtreeOrLeaf(key: Buffer): BTreeNode | number {
if (this.isInternalNode()) {
const currentCellOffsets = this.cellOffsets;
// 先通过二分查找到第一个比 key 大的 cell 的索引
const index = findIndexOfFirstGreatorElement(
currentCellOffsets,
key,
(a, b) => Buffer.compare(this.readCellByPointer(a)!.key, b)
);
let cell: PointerCell;
if (index === -1) {
// the key is greator than or equal to last element
cell = this.readCellByIndex(-1)! as PointerCell;
} else if (index === 0) {
// the key is lesser the first element
cell = this.readCellByIndex(0)! as PointerCell;
} else {
// the key is lesser than element at index, so we return the previous one of index
cell = this.readCellByIndex(index - 1)! as PointerCell;
}
return cell.childPageId;
} else if (this.isEmptyNode()) {
// is newly created root node
return this;
} else {
// is leaf node
return this;
}
}
}
为了方便读取 cell,实现一下根据指针获取 cell 的 readCellByPointer
方法和根据 index 获取 cell 的 readCellByIndex
方法:
private readCellByIndex(index: number): PointerCell | KeyValueCell | null {
const ptrs = this.cellOffsets;
const ptr = ptrs.at(index);
if (typeof ptr === 'undefined') {
return null;
}
return this.readCellByPointer(ptr);
}
private readCellByPointer(ptr: number): PointerCell | KeyValueCell | null {
const buf = this.buffer;
if (this.isInternalNode()) {
const keySize = buf.readInt32BE(ptr);
const size = PointerCell.calcSize(keySize);
const cellBuf = buf.slice(ptr, ptr + size);
return new PointerCell(cellBuf, ptr);
} else if (this.isLeafNode()) {
const keySize = buf.readInt32BE(ptr + 1);
const valueSize = buf.readInt32BE(ptr + 5);
const size = KeyValueCell.calcSize(keySize, valueSize);
const cellBuf = buf.slice(ptr, ptr + size);
return new KeyValueCell(cellBuf, ptr);
}
return null;
}
private readCellByIndex(index: number): PointerCell | KeyValueCell | null {
const ptrs = this.cellOffsets;
const ptr = ptrs.at(index);
if (typeof ptr === 'undefined') {
return null;
}
return this.readCellByPointer(ptr);
}
private readCellByPointer(ptr: number): PointerCell | KeyValueCell | null {
const buf = this.buffer;
if (this.isInternalNode()) {
const keySize = buf.readInt32BE(ptr);
const size = PointerCell.calcSize(keySize);
const cellBuf = buf.slice(ptr, ptr + size);
return new PointerCell(cellBuf, ptr);
} else if (this.isLeafNode()) {
const keySize = buf.readInt32BE(ptr + 1);
const valueSize = buf.readInt32BE(ptr + 5);
const size = KeyValueCell.calcSize(keySize, valueSize);
const cellBuf = buf.slice(ptr, ptr + size);
return new KeyValueCell(cellBuf, ptr);
}
return null;
}
基于此还可以增加一些 helper 方法方便快速访问内部保存的键:
public firstKey() {
return this.readCellByIndex(0)?.key!;
}
public lastKey() {
const cellOffsets = this.cellOffsets;
return this.readCellByIndex(cellOffsets.length - 1)?.key!;
}
public keys() {
return this.cellOffsets.map(p => this.readCellByPointer(p)!.key);
}
public keyAt(n: number) {
return this.readCellByIndex(n)?.key ?? null;
}
public keyCount(): number {
return this.cellOffsets.length;
}
public firstKey() {
return this.readCellByIndex(0)?.key!;
}
public lastKey() {
const cellOffsets = this.cellOffsets;
return this.readCellByIndex(cellOffsets.length - 1)?.key!;
}
public keys() {
return this.cellOffsets.map(p => this.readCellByPointer(p)!.key);
}
public keyAt(n: number) {
return this.readCellByIndex(n)?.key ?? null;
}
public keyCount(): number {
return this.cellOffsets.length;
}
如此,就已经实现了完整的写入逻辑了!接下来我们可以编写一个简单的测试,同时也提供了一个环境来方便我们后续验证最复杂的写入逻辑。