由一个golang的B-Tree包展开-2(插入)

在上一篇中,说了B-Tree的定义以及Google 一个btree包的一些信息, 这里根据阅读其源代码的理解,实现一个简化版的B-Tree插入操作。

Google的那个golang版本的btree包在应对gc并发读写(copy on write)操作方面做了很多优化,这里我在实现的时候,先忽略这些,只是先实现核心的数据操作部分,不过对于一个生产包来说,gc和并发操作却是是要不得不考虑的事情

参考google/btree的精简版代码

B-Tree结构定义及初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 一个btree的定义结构包括其节点最少度树degree,它决定了每个node可以包括多少键值信息
// root表示根节点
type BTree struct {
degree uint
root *node
}

// node代表btree的一个节点
//一个节点包含了一组排序存放的items,以及子节点的指针数组
type node struct {
items Items
children children
}

// children类型是指向node节点类型的指针数组
type children []*node

// Item是一个接口类型,含有一个Less方法,通过这个接口可以实现类似泛型的功能。
type Items []Item
type Item interface {
Less(than Item) bool
}

// 这里就可以使用IntItem(3)初始化出一个Item类型的变量,因为IntItem实现了Item接口的方法。
type IntItem int

// Less returns true if int(a) < int(b).
func (a IntItem) Less(b Item) bool {
return a < b.(IntItem)
}

具体的初始化btree方法如下:

1
2
3
4
5
6
// 返回一棵空树
func NewTree(degree uint) *BTree {
return &BTree{
degree: degree,
}
}

B-Tree中插入新数据

示例代码

1
2
3
4
tr := t.NewTree(2) //初始化一棵最小度为2的树,即2-3-4树,4阶树
int a = 10;
item := t.IntItem(v) // int 类型转换为IntItem类型
tr.ReplaceOrInsert(item)

插入的思路

思路:
总结起来就是:所有插入都由根向下递归查询,最终找到合适的叶子节点插入,对递归沿途经过的已满节点,进行分裂操作,保证最终插入的叶子节点非满。

插入从跟节点开始进行二分查询,向下递归,知道找到合适的叶子节点进行插入。 在递归向下的过程中,为了不破坏btree的规则,即 len(item) < 2t-1,每次向下递归的目标子树 len(node.children[i].items) 已满的时候,就对node进行一次分裂,保证分裂后的node.children[i].items元素个数小于2t-1.
具体的分裂做法: 由于node.children[i].items的值是单调递增的, 将node.children[i]从中间截断,形成两个node, left_node, right_node,各自拿到原node中items, children的前一半和后一半,并将node.children[i].items最中间那个提出,插入到放到 node.items[i]到位置。 left_node,right_node则作为两个新子节点,替换node.children[i]原本到位置
这样,与分裂前比起来, node.items元素增加了一个, node.children元素也增加了一个,并不会破坏btree的性质。 从经过分裂之后到节点进行插入操作,就可以顺利递归下沉到子树,直到到达叶子节点(进入叶子节点前,该节点也必然已经经历了分裂检查,所以必然是未满节点),进行插入。
当然,每次对node.items进行二分查询定位是,如果发现要插入到元素已经存在,则不进行插入操作,直接返回原内通。

。。。 此处应该有图形展示,稍等,等我找到合适的图床,我就上图,没有图文并茂,十分抱歉。。。 凑合看的插入演示。。。

配套函数

从上面到表述中,已经看到,在每一层节点进行查询时,都要进行一个二分查询定位到操作,这个查询定位的find函数功能如下:
在一个生序排列到数组中,查询目标元素, 如果元素存在,则返回目标元素在数组中索引和元素找到到flag标志,如果元素不存在,则返回第一个大于目标元素到数组索引。 比如[1,3,5]中查询1则返回(0,true), 查询4则返回[2, false]。

代码如下,可以看出主要调用了golang 基础包sort提供的Search方法, 从描述中我们可以看出,search方法使用二分搜索去返回元素中最小的满足[0,n)中最小的满足f(i)函数为true的索引i,如果f(i)为true,则f(i+1)也为true。 这里的f()需要我们自定义传入。

1
2
3
4
5
6
7
8
9
10
func (items Items) find(t Item) (index int, found bool) {

i := sort.Search(len(items), func(i int) bool {
return t.Less(items[i])
})
if i > 0 && !items[i-1].Less(t) {
return i - 1, true
}
return i, false
}

sort.Search

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Search uses binary search to find and return the smallest index i
// in [0, n) at which f(i) is true, assuming that on the range [0, n),
// f(i) == true implies f(i+1) == true. That is, Search requires that
// f is false for some (possibly empty) prefix of the input range [0, n)
// and then true for the (possibly empty) remainder; Search returns
// the first true index. If there is no such index, Search returns n.
// (Note that the "not found" return value is not -1 as in, for instance,
// strings.Index.)
// Search calls f(i) only for i in the range [0, n).
func Search(n int, f func(int) bool) int {
// Define f(-1) == false and f(n) == true.
// Invariant: f(i-1) == false, f(j) == true.
i, j := 0, n
for i < j {
h := int(uint(i+j) >> 1) // avoid overflow when computing h
// i ≤ h < j
if !f(h) {
i = h + 1 // preserves f(i-1) == false
} else {
j = h // preserves f(j) == true
}
}
// i == j, f(i-1) == false, and f(j) (= f(i)) == true => answer is i.
return i
}

插入逻辑实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (t *BTree) ReplaceOrInsert(item Item) Item {

//如果是空树,则创建根后插入
if t.root == nil {
t.root = t.newNode()
t.root.items = append(t.root.items, item)
return nil
}

// 从跟节点向下寻找合适的插入节点,对于新的item,最终的插入会落到叶子节点,
// 下降过程中,每个经过的节点如果已满,聚会进行一次分裂动作,
// 这样保证后续插入的时候,实际插入的节点肯定会有空闲空间供插入

// 对有可能改变根节点的情况进行单独处理
// 如果根结点已满,对根节点进行分裂处理
// 这里的MaxCap返回的是每个节点子树的最大树木,对于4阶树就是4
if len(t.root.items) >= t.MaxCap() {
midIndex := len(t.root.items) / 2
upItem, newNode := t.root.split(midIndex)
newRoot := t.newNode()
newRoot.items = append(newRoot.items, upItem)
newRoot.children = append(newRoot.children, t.root, newNode)
t.root = newRoot
}
// 从根节点开始进行插入动作,
return t.root.insert(item, t.MaxCap())
}

再跳转到node.insert部分代码看具体逻辑:就是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98

func (n *node) insert(item Item, maxCap int) Item {
//对node本层的item进行二分查询
// 如果item已经存在,则返回item在n.items中索引位置和true
// 如果不存在,则返回符合item范围的n.children数组元素索引和fasle
index, found := n.items.find(item)

// 如果判断元素存在,则插入操作结束,返回item值
if found {
return item
}

// 如果逻辑走到这里, 说明本层查询没有找到对于item
// 如果已经到叶子节点,则直接插入item到目标位置
/* **
tips:最开始看这部门代码我时有些迷惑的,
为什么这里到了叶子节点就可以直接插入呢?
不用考虑叶子节点已满的情况吗?
仔细缕了一边逻辑,发现时通过对跟节点的分裂判断以及后续
的maybeSplitChild判断操作,保证递归到达叶子节点的时候
根节点的items一定时有空余的空间可以插入的
* **/
if len(n.children) == 0 {
n.items.insertAt(index, item)
return nil
}

// 不是叶子节点,看看i处的子Node是否需要分裂, 这里的操作是为了保证后面进行插入下沉到子节点时,子节点n.children[index]一定未满
if n.maybeSplitChild(index, maxCap) {
// 分裂了,导致当前node的变化,需要重新定位
// 保证插入查询能下沉到items符合范围的子树中
inTree := n.items[index] // 获取新升级的item
switch {
case item.Less(inTree):
// 要插入的item比分裂产生的item小,i没改变
case inTree.Less(item):
index++ // 要插入的item比分裂产生的item大,i++
default:
// 分裂升level的item和插入的item一致,替换
out := n.items[index]
n.items[index] = item
return out
}
}

// 递归插入到items符合插入范围的子树
return n.children[index].insert(item, maxCap)
}

/*
maybeSplitChild会对node.chilren[childIndex]进行判断,
如果其items已满,则进行一次分裂,将node.chilren[childIndex].items最中央的item取出插入到node.items[childIndex]的位置,
同时将node.chilren[childIndex]中央以后的items和children组成一个新node,插入到node.chilren[childIndex]之后

这样做达到了两个目的:
1. node的items和children元素个数分别+1,保证了不破坏btree的属性
2. 保证了后续插入查询递归下沉到node的某一棵子树的时候,子树items未满
目的2特别重要,保证了插入到达子树时直接插入的合法合理
*/
func (n *node) maybeSplitChild(childIndex int, maxCap int) bool {
l := len(n.children[childIndex].items)

if l >= maxCap {
upItem, newNode := n.children[childIndex].split(l / 2)

n.items.insertAt(childIndex, upItem)
n.children.insertAt(childIndex+1, newNode)

return true
} else {
return false
}

}

// split会修改原node,导致原node的items和children减半
/*
tips: 这里在做split的时候,一定不要忘记了对nodechildren的split
*/
func (n *node) split(mid int) (Item, *node) {

if len(n.items)-1 < mid {
panic("error index")
}
upItem := n.items[mid]
newNode := &node{}

newNode.items = append(newNode.items, n.items[mid+1:]...)
if n.children != nil {
newNode.children = append(newNode.children, n.children[mid+1:]...)
}

n.items = n.items[:mid]
if n.children != nil {
n.children = n.children[:mid+1]
}
return upItem, newNode
}

插入演示

exp1:空树直接插入

1
2
3
4
5
6
7
最小度为2的树(2-3-4树)插入例子:
item:1
原btree:
| |
判断为空树,直接插入root.items,结束
新btree:
| 1 |

exp2: 根直接插入

1
2
3
4
5
6
7
8
9
10
 item: 3
原树:
| 1 | 2 |

step1,判断root.items是否已满,已满则进行分裂(未满,不分裂)
step2,执行root.insert,发现已经是最底一层,直接插入(step1保证step2直接插入不破坏tree属性定义)
output:
| 1 | 2 | =》直接插入

| 1 | 2 | 3 | (stop)

exp3: 插入触发根分裂

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 item: 4
原树:
| 1 | 2 | 3 |

step1:判断root.items是否已满,已满则进行分裂(已满,需要分裂)

| 1 | 2 | 3 | (原tree)

| 2 |
/ \ (分裂后新的btree)
| 1 | | 3 |

step2: 对新的btree执行插入操作,进入第一层,第一层查询未找到,
进入chilren[1],判断chilren[1]是否需要进行分裂处理(不需要)

step3: 对children[i]进行item插入操作,发现为子节点,直接插入。
操作前 children[1]: | 3 |
插入后chilren[1]: | 3 | 4 |

插入结束,此时btree最终形态为:
| 2 |
/ \
| 1 | | 3 | 4 |

exp4:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
item:5
原树:
| 2 |
/ \
| 1 | | 3 | 4 |

step1:根是否已满判断(未满,接着执行root.insert)

step2: 进入第一层查询,f返回index1, d对node.chilren[1]进行是否已满判断(未满)

step3: 递归进入node.chilre[1],发现到达根节点,直接插入
原node: | 3 | 4 |
新node: | 3 | 4 | 5 |
插入结束,此时的btree最终形态如下:
| 2 |
/ \
| 1 | | 3 | 4 | 5 |

exp5:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
item:6
原树:
| 2 |
/ \
| 1 | | 3 | 4 | 5 |

step1:根是否已满判断(未满,接着执行root.insert)

step2: 进入第一层查询,f返回index1, d对node.children[1]进行是否已满判断
(已满,对node节点在1处执行分裂),
执行完分裂后,树的形态如下,同时下降的子树index+1,插入下沉到node.children[2],即|5|:

| 2 | 4 |
/ \ \
| 1 | | 3 | | 5 |

step3: 递归到node.chilren[2]作为新的node执行insert item, 发现已经到叶子,直接插入
插入前node: | 5 |
插入后node: | 5 | 6 |
插入结束,返回nil, 此时btre最终形态如下:
| 2 | 4 |
/ \ \
| 1 | | 3 | | 5 | 6 |

exp6:
接着插入7的过程和exp5一致,略过,来看插入8:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
item: 8
原树:
| 2 | 4 |
/ \ \
|1| |3| |5|6|7|

step1:根是否已满判断(未满,接着执行root.insert)

step2: 进入第一层查询,f返回index = 2 , d对node.children[2 ]进行是否已满判断
(已满,对node节点在2处执行分裂,分裂后index = 3,分裂后结果如下:

| 2 | 4 | 6 |
/ \ \ \
|1| |3| |5| |7|


step3: 经过step2之后,插入递归到下一层,也就是 |7|这里,发现是叶子,直接插入
插入前节点: | 7 |
插入后节点: | 7 | 8 |
插入结束,返回nil,此时最终的btre形态为
| 2 | 4 | 6 |
/ \ \ \
|1| |3| |5| |7|8|

exp7:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
item: 9
原树:
| 2 | 4 | 6 |
/ \ \ \
|1| |3| |5| |7|8|


step1: 根已满,进行根分裂,分裂后btree:
| 4 |
/ \
| 2 | | 6 |
/ \ / \
|1| |3| |5| |7|8|

step2: 对分裂后的新树执行从根执行insert,
进入第一层,未找到,index = 1 ,root.children[1]未满,进入下一层
step3: 依次递归进入第二层 | 6 |, 第三层 | 7| 8|此时已经到叶子,直接插入

插入前node : |7|8|,
插入后node: |7|8|9|
插入结束,返回nil,最终的btree如下:

| 4 |
/ \
| 2 | | 6 |
/ \ / \
|1| |3| |5| |7|8|9|