当前位置: 首页 > news >正文

郑州设计网站公司郑州网站关键词排名

郑州设计网站公司,郑州网站关键词排名,建设旅游网站数据库设计,佛山电脑培训班哪里有6.584-LabA HomeworkReference CodeReference Blog 通过作业提供的概览图可以看出整个系统的组成:用户 Clerk 会发出命令(Get、Put、Append)到每个 Service,每个 Service 接收到命令后向下传递到 RaftCode 层,由 RaftC…

6.584-LabA

  • Homework
  • Reference Code
  • Reference Blog

diagram of Raft interactions

通过作业提供的概览图可以看出整个系统的组成:用户 Clerk 会发出命令(Get、Put、Append)到每个 Service,每个 Service 接收到命令后向下传递到 RaftCode 层,由 RaftCode 层负责自己的“事情”(选举、生成log、提交Commit log、应用Apply log…)。 RaftCode 层将 Apply log通过“通道”传递到自己的 Service,Service 将Apply log的命令(Get、Put、Append)应用到自己的本地数据库db。

本次作业是实现 RaftCode 之上的“应用层”,主要是三个方面:

  1. Service 接收 Clerk发来的命令;
  2. Service 将接收的命令下放到自己的 RaftCode 层;
  3. RaftCode 层将自己提交Apply 的 log 返回给自己上层的 Service,Service 将接收到 RaftCode 已经 Apply log 应用到 数据库db;

文件包含的函数介绍

kvraft/common.go

包含 Clerk 与 Service 进行 RPC 的 Args、Reply 结构体。

PutAppendArgs & PutAppendReply
由于 Put 和 Append 命令都包含一个 Key 和 Value,所以可以将 Put & Append 信息合并为同一个结构体。
Op 来区分 Put 和Append;
Identifier:表示这个命令来自哪个 Clerk;
Seq:表示这个命令来自 Clerk 的第几条命令;

Identifier + Seq共同构成了命令的唯一标号。

type PutAppendArgs struct {Key        stringValue      stringOp         string // Op = "Put" or "Append"Identifier int64Seq        uint64
}type PutAppendReply struct {Err Err
}

GetArgs & GetReply
Get 命令只包含一个 Key

type GetArgs struct {Key        stringIdentifier int64Seq        uint64
}type GetReply struct {Err   ErrValue string
}

kvraft/client.go

负责将 Clerk 的命令传递给 Service;根据接收到 Service 处理结果的信息,并做出相应的反应。

Clerk结构体的字段
identifier:会有多个 Clerk 并行向 Service 发送命令,为了区分 Clerk 要给一个身份标识;
leaderId:记录当前为 Leader 的 Service,不用每次都要轮询去找 Leader Service.
seq:为 Clerk 下条发送命令的编号

type Clerk struct {servers    []*labrpc.ClientEnd // 所有的Serviceseq        uint64 // 单调递增序列号identifier int64  // 标识clerkleaderId   int    // 记录leader的id
}

MakeClerk():

func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {ck := new(Clerk)ck.servers = serversck.seq = 0ck.identifier = nrand()return ck
}

创建一个 Clerk,并将 ClerkID 初始化为一个唯一的id。

Get_Seq()

func (ck *Clerk) Get_Seq() (SendSeq uint64) {SendSeq = ck.seqck.seq += 1return
}

返回一个标号给当前的命令,并自增1做为下一条命令的标号。

Get(key string) string

  1. 将 Get 包装为 GetArgs 通过 PRC 发送给 Service
  2. 得到 Service 的回复 GetReply
    Service的回复有几种情况:
    2.1 接收的 Service 并不是 Leader 或者 是一个 过时的Leader,那么继续询问下一个 Service
    2.2 当通道关闭(至于为什么会通道关闭,后面解释)或者处理超时都继续轮询这个 Service 发送命令
    2.3 没有出现错误,则return reply.Value(如果没有Key的话,会返回空字符串)
func (ck *Clerk) Get(key string) string {args := &GetArgs{Key: key, Identifier: ck.identifier, Seq: ck.Get_Seq()}for {reply := GetReply{}ok := ck.servers[ck.leaderId].Call("KVServer.Get", args, &reply)if !ok || reply.Err == ErrNotLeader || reply.Err == ErrLeaderOutDated { // 询问的server是follower or 过时的leader,就继续轮询下一个serverck.leaderId = (ck.leaderId + 1) % len(ck.servers)continue}switch reply.Err { // 当返回 通道关闭&操作超时 则继续轮询这个leadercase ErrChanClose:continuecase ErrHandleOpTimeOut:continuecase ErrKeyNotExist:return reply.Value // 不存在Key,那么Value就是默认零值--空字符串""}return reply.Value}
}

PutAppend()
Get()做同样处理。不过不会出现ErrKeyNotExist这个错误,也没有返回值。

func (ck *Clerk) PutAppend(key string, value string, op string) {// Identifier:表示该Com来自哪个clerk 、Seq:表示来自第几个Cmd。 Identifier+Seq构成Cmd的唯一标识args := &PutAppendArgs{Key: key, Value: value, Op: op, Identifier: ck.identifier, Seq: ck.Get_Seq()}for {reply := PutAppendReply{} // 重试RPC时, 需要新建reply结构体, 重复使用同一个结构体将导致labgob报错ok := ck.servers[ck.leaderId].Call("KVServer.PutAppend", args, &reply)if !ok || reply.Err == ErrNotLeader || reply.Err == ErrLeaderOutDated {ck.leaderId = (ck.leaderId + 1) % len(ck.servers)continue}switch reply.Err {case ErrChanClose:continuecase ErrHandleOpTimeOut:continue}return}
}

kvraft/server.go

这个文件中主要实现的逻辑:

  1. Service 接收到命令后传递给 Raft ;
  2. Service 接收到 Raft 提交后的命令后 Apply 到本地数据库db中;
  3. 如果是 Leader 还肩负处理完之后通知 Clerk 的职责;

相关结构体

type Op struct {OpType     OpType // 操作类型Key        string Val        stringSeq        uint64 // 该操作命令的Seq编号Identifier int64 // 发出该操作命令的Clerk的ID
}
type result struct { // 存储一个请求的序列号和结果LastSeq uint64Err     ErrValue   stringResTerm int // ResTerm记录commit被apply时的term 因为其可能与Start相比发生了变化, 需要将这一信息返回给客户端
}
type KVServer struct {mu         sync.Mutexme         intrf         *raft.RaftapplyCh    chan raft.ApplyMsgdead       int32                // set by Kill()// Code HerewaiCh      map[int]*chan result // 映射 startIndex->Ch 纪录等待commit信息的RPC handler的通道historyMap map[int64]*result    // 映射 Identifier->*result 记录某clerk的最高序列号的请求的序列号和结果resultmaxraftstate int // snapshot if log grows this bigmaxLen       intdb           map[string]string
}

RPC Handler:Get() & PutAppend()

func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {_, isLeader := kv.rf.GetState()if !isLeader { // 访问的server不是leaderreply.Err = ErrNotLeaderreturn}opArgs := &Op{OpType: OpGet, Key: args.Key, Seq: args.Seq, Identifier: args.Identifier}res := kv.HandleOp(opArgs)reply.Err = res.Errreply.Value = res.Value
}
// Get和PutAppend都将请求封装成Op结构体, 统一给HandleOp处理
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {_, isLeader := kv.rf.GetState()if !isLeader {reply.Err = ErrNotLeaderreturn}opArgs := &Op{Key: args.Key, Val: args.Value, Seq: args.Seq, Identifier: args.Identifier}if args.Op == "Put" {opArgs.OpType = OpPut}if args.Op == "Append" {opArgs.OpType = OpAppend}res := kv.HandleOp(opArgs)reply.Err = res.Err
}

可以从代码中看到 Get()PutAppend()的逻辑基本相似:

  1. 先判断下层的 Raft 是否为 Leader,若不是那么就返回ErrNotLeader。因为在 Raft 层中,只有 Leader 能接收命令,由 Leader 通过“心跳”发送给 Follower。
  2. 将接收到的命令(Get、Put、Append)同一封装为 Op结构体。
  3. 将封装命令的Op结构体传入HandleOp()函数进一步处理并得到返回的结果。

HandleOp()

func (kv *KVServer) HandleOp(opArgs *Op) (res result) {startIndex, startTerm, isLeader := kv.rf.Start(*opArgs) // 这里调用Raft层,将Clerk的Cmd下传到Raftif !isLeader {return result{Err: ErrNotLeader, Value: ""}}kv.mu.Lock()newCh := make(chan result)kv.waiCh[startIndex] = &newCh // ApplyHandler 通过通道将Cmd的结果返回kv.mu.Unlock()                // Start函数耗时较长, 先解锁defer func() {kv.mu.Lock()delete(kv.waiCh, startIndex)close(newCh)kv.mu.Unlock()}()select { // 管道多路复用的控制结构,同时监测多个管道是否可用case <-time.After(HandOpTimeOut):res.Err = ErrHandleOpTimeOutreturncase msg, success := <-newCh: // 取出ApplyHandler的结果if !success {// 通道已经关闭, 有另一个协程收到了消息 或 通道被更新的RPC覆盖res.Err = ErrChanClosereturn} else if success && msg.ResTerm == startTerm {res = msgreturn} else {// Cmd执行完传递回来的term与一开始传入Cmd建立log的term不一致,说明这个leader可能过期了res.Err = ErrLeaderOutDatedres.Value = ""return}}
}

在函数的第一行调用了 Raft中的 Start 函数kv.rf.Start(*opArgs),Start函数如下图:
Raft.Start

可以看出,start()函数会接收一个命令,判断是否是 Leader,然后会将命令封装为Entry插入 Raft 的 log 中,返回(这条命令在 log 中的全局下标,插入该条命令时的 Term,是否为 Leader)。

回到HandleOp函数的逻辑:

  1. 判断 RaftCode 层是否为 Leader,若不是则返回ErrNotLeader
  2. 利用插入的命令在 RaftCode 层的 log 中的下标索引映射一个通道,后面利用这个通道获取 Apply命令到本地后的结果
  3. 检查是否超时,若超时则返回ErrHandleOpTimeOut
  4. 若在规定时间(2S)内接收到了 ApplyHandler放到通道中的结果的话,就取出通道中的结果
    4.1 要提前判断通道是否关闭。设想一下这种情况,有一个 RPC 信息已经创建了通道Ch1,然后执行ApplyHandler之后因为某种原因无法进行而“死掉”(可能是网络原因),Clerk 那边超时重发一个包含相同编号命令的 RPC 创建了通道Ch2覆盖了之前的通道Ch1。不对,覆盖不了之前的通道Ch1哇,当两个 RPC 命令传递给 Raft 后返回的startIndex一定不会相同,创建的通道就不会覆盖哇,不懂了,QAQ(有人懂这里通道为什么会提前关闭呢,请不吝赐教)。
    4.2 如果 msg.ResTerm != startTerm表明已经上个 Leader 已经过期了,已经不属于上个 Term 了。

HandleOp中的selectswitch作用相似,不过select管道的多路复用,用于检测多个管道是否能用

ApplyHandler()

func (kv *KVServer) ApplyHandler() {for !kv.killed() {log := <-kv.applyCh // Raft层处理完负责的部分(选举、生成日志、Snapshot等),Raft将提交的Cmd通过通道应用到K/V的db(数据库)if log.CommandValid {op := log.Command.(Op) // 类型断言:检查变量是否为某种类型kv.mu.Lock()var res resultneedApply := false //判断这个log是否需要被再次应用到K/Vdbif hisMap, isexist := kv.historyMap[op.Identifier]; isexist {if hisMap.LastSeq == op.Seq { // 历史记录存在且Seq相同,直接返回之前的历史结果res = *hisMap} else if hisMap.LastSeq < op.Seq {needApply = true // 历史记录中的Cmd是之前的Cmd,而这个是更新的Seq的Cmd仍需要在db中创建}} else { // 历史db中没有该记录,需要创建needApply = true}_, isLeader := kv.rf.GetState()if needApply {// 在K/Vdb上执行log中的Cmdres = kv.DBExecute(&op, isLeader)res.ResTerm = log.SnapshotTerm// 更新历史的记录kv.historyMap[op.Identifier] = &res}if !isLeader { // kv.rf不是leader就处理下一个logkv.mu.Unlock()continue}// 是leader则还需要额外通知handler处理clerk回复ch, isexist := kv.waiCh[log.CommandIndex]if !isexist {// 接收端的通道已经被删除了并且当前节点是 leader, 说明这是重复的请求, 但这种情况不应该出现, 不然panic// Raft 层可能因为网络等某种原因,发送了两次 apply 同一个 log 的请求,第二次发现通道已关闭,就跳过处理下一个 applykv.mu.Unlock()continue}kv.mu.Unlock()func() {defer func() {if recover() != nil {// 如果这里有 panic,是因为通道关闭DPrintf("leader %v ApplyHandler 发现 identifier %v Seq %v 的管道不存在, 应该是超时被关闭了", kv.me, op.Identifier, op.Seq)}}()res.ResTerm = log.SnapshotTerm*ch <- res // 这里将结果通过通道返回给}()}}
}

逻辑:

  1. 取出 RaftCode 放入通道 applyCh Apply 的 log,要保证取出 log 中的命令 Cmd 是有效的。
  2. 需要判断命令 Cmd 是否在本地数据库db应用过,如果 hisMap.LastSeq == op.Seq表明之前执行过,直接返回保存的结果。如果不存在 或者 保存的hisMap.Seq < op.Seq表明这是编号为op.Identifier的 Clerk 新的 Cmd,均需要在本地数据库db中 Apply
  3. 如果命令需要在本地数据库db中应用则调用函数DBExecute在本地数据库 apply 命令
  4. 如果 Service 是 Leader 的话还需要负责向 Clerk 通知在本地数据库 apply 的结果,如果是 Follower 的话就处理下一个 log 即可。
    4.1 通过在HandleOp中创建的通道返回结果,要先判断通道是否存在。Raft 层可能因为网络等某种原因,发送了两次 apply 同一个 log 的请求,第二次发现通道已关闭,就跳过处理下一个 apply

有关恢复panicrecover函数的使用请跳转:Blog

http://www.dt0577.cn/news/42481.html

相关文章:

  • 外国s网站建设百度普通下载
  • 稳定的手机网站设计网站注册时间查询
  • 手把手教你做网站百度指数查询入口
  • 软件园做网站晨阳seo顾问
  • 自己开网站能赚钱吗怎么优化一个网站
  • 如何给一个网站做压测关键词排名优化系统
  • 一流的山西网站建设西安网站建设哪家好
  • 江西汽车网站建设云优化seo
  • wordpress 分类主题长沙企业seo服务
  • 苏州企业网站公司都有哪些网络营销的缺点及建议
  • 厚街镇仿做网站企业网络营销策划方案范文
  • 微信 网站提成方案点做展示型网站设计公司
  • dw 怎么做钓鱼网站关键词爱站网关键词挖掘工具
  • 开发一款游戏需要多少钱王通seo
  • php高级网站开发谷歌外贸
  • 网站规划主要内容sem优化怎么做
  • 网站建设应遵循哪几项原则广告
  • 做网站的有什么软件百度文库官网登录入口
  • 做外国独立网站广州网站排名推广
  • 微信开放平台 网站开发互联网平台
  • 一款蛋糕食品类企业手机网站源码智推教育seo课程
  • 做网站工资待遇如何抖音搜索排名
  • 做网站用什么团建河南网站排名优化
  • 烟台外贸网站建设公司关键词排名优化公司地址
  • 360免费建站pomhub惠州seo按天计费
  • 网站备案流程公安长沙专业网络推广公司
  • 公司电脑为什么有的网站打不开关键词seo排名怎么样
  • 调用其他网站php页面江阴百度推广公司
  • 免费开源建站教学网站厦门关键词优化网站
  • 大气腐蚀网站建设想做推广哪个平台好