Lab3实现的KV服务,只有一个Leader接收所有的请求,在Lab4中,将所有请求分配到多个Leader-Followers集群中,每个Leader处理属于它这一集群管理的请求。
请求的划分方式有很多种,例如Key以'A'开头的请求都交给某一Leader-Followers集群处理,实验中,测试程序负责提供划分方案。
本文中,同样也是在代码中,Group
表示一个Leader-Followers集群,Gid
为它的标识,Shard
表示所有请求的一个子集,Config
表示一个划分方案。本实验中,所有请求分为NShards = 10
份,Server给测试程序提供四个接口。
- Join:为某个Group添加节点,或添加Group。
- Leave:移除某个Group。
- Move:某个Shard分配给某个Group处理。
- Query:上面三个操作都会改变当前Config,Query返回某个历史版本的Config。
Controller Server也会组成一个集群,使用Raft做容灾处理,因此,Controller Server和KVServer是一样的结构。只不过是从Get
、Put
、Append
变成了Join
、Leave
、Move
、Query
,因此直接把Lab3A的代码复制过来,再稍加改动就OK了。
Server
configs存储了所有的config版本,每一次进行Join或Leave或Move都会基于当前config追加一个新的config到configs中,之所以要保存所有的config,主要是为了测试程序检查。
Num就是版本编号,Shards记录当前方案,每个shard由哪个group负责处理,这个分配方案应该是均衡的,所以每一次config的变化都要重新分配,使Shards平均。
Groups记录了每个group中都有哪些节点。
type ShardCtrler struct {
configs []Config
}
type Config struct {
Num int // config number
Shards [NShards]int // shard -> gid
Groups map[int][]string // gid -> servers[]
}
复制代码
Join
先获取当前config,即configs中最后一个config,然后基于这个config做修改,把新Join的server加入到config.Groups中,然后把config追加到configs后。
func (sc *ShardCtrler) Join(servers map[int][]string) {
config := sc.GetConfig()
for gid, group := range servers {
config.Groups[gid] = append([]string(nil), group...)
}
sc.configs = append(sc.configs, config)
}
复制代码
Leave
和Join是类似的,删除Groups中的某个group。
func (sc *ShardCtrler) Leave(gids []int) {
config := sc.GetConfig()
for _, gid := range gids {
delete(config.Groups, gid)
}
sc.configs = append(sc.configs, config)
}
复制代码
Move
Move只需要修改Shards结构即可。和Lab3一样,因为Query是读请求,所以在RPC中直接返回configs[num]
即可。
func (sc *ShardCtrler) Move(shard int, gid int) {
config := sc.GetConfig()
config.Shards[shard] = gid
sc.configs = append(sc.configs, config)
}
复制代码
Balance
每一次写请求后都需要修改Shards保证所有group负责的shard数量最大和最小之差不超过1,这里先收集所有group负责的shard数量,然后进行排序。
先算出平均每个group应该负责多少个shard,多的拿出来,少的加进去,为了保证每个Controller Server动作一致,所以shard数量相等的,按照gid排序。
这里给出Balance的代码,go不太熟悉,写的比较乱,但能过测试就行了。
func (sc *ShardCtrler) Balance() {
config := sc.configs[len(sc.configs)-1]
if len(config.Groups) == 0 {
return
}
m_gid_shardnum := make(map[int]int)
unassigned := make([]int, 0)
avgShardNum := NShards / len(config.Groups)
for gid := range config.Groups {
m_gid_shardnum[gid] = 0
}
for shard, gid := range config.Shards {
if _, ok := config.Groups[gid]; !ok {
unassigned = append(unassigned, shard)
continue
}
if m_gid_shardnum[gid] < avgShardNum {
m_gid_shardnum[gid]++
} else {
unassigned = append(unassigned, shard)
}
}
s_gid_shardnum := make([][2]int, 0)
for gid, shardnum := range m_gid_shardnum {
s_gid_shardnum = append(s_gid_shardnum, [2]int{gid, shardnum})
}
sort.Slice(s_gid_shardnum, func(i, j int) bool {
if s_gid_shardnum[i][1] == s_gid_shardnum[j][1] {
return s_gid_shardnum[i][0] < s_gid_shardnum[j][0]
}
return s_gid_shardnum[i][1] < s_gid_shardnum[j][1]
})
base := 0
for _, v := range s_gid_shardnum {
gid := v[0]
shardnum := v[1]
for i := 0; i < avgShardNum-shardnum; i++ {
config.Shards[unassigned[base+i]] = gid
}
if avgShardNum-shardnum > 0 {
base += avgShardNum - shardnum
}
}
for _, v := range s_gid_shardnum {
if base >= len(unassigned) {
break
}
gid := v[0]
config.Shards[unassigned[base]] = gid
base++
}
sc.configs[len(sc.configs)-1].Shards = config.Shards
}
复制代码
实验总结
Lab4A和Lab3A差不多,所以也比较简单。唯一要注意的点是,如果你不熟悉go语言,需要了解一下go的传值和传引用相关的坑,go中,切片和map都是传引用的,所以要注意深拷贝的问题。
最后,为了证明我不是在乱写,附上我的测试结果。
近期评论