新注册的用户请输入邮箱并保存,随后登录邮箱激活账号。后续可直接使用邮箱登录!

scalable_BFT.go 9.38 KB
package consensus

import (
	"Walnut-HS/pkg/network"
	"Walnut-HS/pkg/protobuf"
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"runtime"
	"sort"
	"sync"
	"time"
)

var MAXMESSAGE = 2048
var BATCHSIZE = 4194 // each block contains BATCHSIZE regular commands
// 4194 (1M);
// 8388 (2M);

type TimeStatistic struct {
	View        uint32
	StartTime   time.Time
	EndTime     time.Time
	IsCommittee bool
}

var ViewChangedList []uint32

var PAYLOADSIZE = 250   // 250 bytes per request
var VIEWS = uint32(100) // The number of views the experiment run in total

// Init some parameters and handle catchup messages
func (p *HonestParty) Init(ctx context.Context) {
	// Initialize message sets
	p.nxtcom = make([]*protobuf.Vrf, 0, p.lambda+100)
	p.newVrfView = make([]*protobuf.NewVrfView, 0, p.QCsize)
	p.newView = make([]*protobuf.NewView, 0, p.QCsize)
	p.prepareVoteMsg = make([]*protobuf.PrepareVote, 0, p.QCsize)
	p.precommitVoteMsg = make([]*protobuf.PrecommitVote, 0, p.QCsize)
	p.commitVoteMsg = make([]*protobuf.CommitVote, 0, p.QCsize)
	p.sttranMsg = make([]*protobuf.StateTransfer, 0, p.QCsize)

	// handle catchup messages
	go p.MonitorCatchup(ctx)

	// Update vrf view and send vrf messages
	go p.MonitorNewVrfView(ctx)

}

func (p *HonestParty) MonitorNewVrfView(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			return
		case newvrfviewMsg := <-p.GetMessage("NewVrfView", p.ViewVrf):
			newvrfview := network.Decapsulation("NewVrfView", newvrfviewMsg).(*protobuf.NewVrfView)
			p.OnReceiveNewVrfView(newvrfview)
		}
	}
}

func (p *HonestParty) MonitorCatchup(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			return
		case catchupMsg := <-p.GetMessage("Catchup", catchuint32):
			catchup := network.Decapsulation("Catchup", catchupMsg).(*protobuf.Catchup)
			p.OnReceiveCatchup(catchup)
		}
	}
}

func (p *HonestParty) WaitBlock() {
	proposalMsg := <-p.GetMessage("Block", p.ViewSt)
	proposal := network.Decapsulation("Block", proposalMsg).(*protobuf.Block)
	SaveBlock(p.Blockstore, proposal)
}

// Run the consensus
func (p *HonestParty) Run(timeout int, startTime time.Time) {

	latencyTimeStart := make([]time.Time, VIEWS)
	latency := make([]int64, VIEWS)

	latencyStTimeStart := make([]time.Time, VIEWS)
	latencySt := make([]int64, VIEWS)

	statisticMap := make(map[uint32]*TimeStatistic, 0)
	var mapMutex sync.RWMutex

	for p.View < VIEWS && p.ViewSt < VIEWS {
		p.sttranMsg = p.sttranMsg[:0]

		if p.CurComBool[p.PID] {
			SaveView(p.Blockstore, p.View)

			// Initialize message sets
			p.nxtcom = p.nxtcom[:0]
			p.newView = p.newView[:0]
			p.prepareVoteMsg = p.prepareVoteMsg[:0]
			p.precommitVoteMsg = p.precommitVoteMsg[:0]
			p.commitVoteMsg = p.commitVoteMsg[:0]

			// Set time counter
			to := p.NewTimeout()
			timer := time.Duration(timeout) * time.Second * time.Duration(to)
			ctx := context.Background()
			ctxWithTimeout, cancel := context.WithTimeout(ctx, timer)
			defer cancel()

			// Latency
			latencyTimeStart[p.View] = time.Now()

			mapMutex.Lock()
			statisticMap[p.View] = &TimeStatistic{
				View:        p.View,
				StartTime:   time.Now(),
				EndTime:     time.Now(),
				IsCommittee: true,
			}
			mapMutex.Unlock()

			// repeated timeout
			if to >= 1<<4 {
				v := p.bcommitrefer.View
				c := p.Com
				p.SendCatchup()
				p.WaitHistory(v, c, ctxWithTimeout, cancel)

			}
			// Can be put in the beginning of each view
			p.SendNewVrfView()
			p.SendNewView(p.GetLeader(p.View))

			// Monitor vrf
			go p.WaitVrf(ctxWithTimeout, p.View)

			// Wait to trigger committee rotation

			if p.View == 20 {
				time.Sleep(200 * time.Millisecond)
			}
			// run eventloop to handle different type of messages in consensus
			if p.PID != p.GetLeader(p.View) {
				proposalreceived := p.WaitProposal(ctxWithTimeout)
				if proposalreceived {
					p.EventLoop(ctxWithTimeout, cancel)
				}
			} else {
				p.EventLoop(ctxWithTimeout, cancel)
			}

			latency[p.bcommitrefer.View] = time.Since(latencyTimeStart[p.bcommitrefer.View]).Milliseconds()

			mapMutex.Lock()
			statisticMap[p.View].EndTime = time.Now()
			mapMutex.Unlock()
			p.View += 1
		} else {
			// Latency
			latencyStTimeStart[p.ViewSt] = time.Now()
			mapMutex.Lock()
			statisticMap[p.View] = &TimeStatistic{
				View:        p.View,
				StartTime:   time.Now(),
				EndTime:     time.Now(),
				IsCommittee: false,
			}
			mapMutex.Unlock()

			p.WaitBlock()
			p.WaitStateTransfer()
			if statisticMap[p.bcommitrefer.View] != nil {
				mapMutex.Lock()
				statisticMap[p.bcommitrefer.View].EndTime = time.Now()
				mapMutex.Unlock()
			}
			latencySt[p.bcommitrefer.View] = time.Since(latencyStTimeStart[p.bcommitrefer.View]).Milliseconds()
		}
		runtime.GC()
	}
	fmt.Println("PID: ", p.PID, " Latency: ")
	latencyBytes, _ := json.Marshal(latency)
	fmt.Println(string(latencyBytes))
	latencyStBytes, _ := json.Marshal(latencySt)
	fmt.Println(string(latencyStBytes))
	fmt.Println("Committee Changed At ", ViewChangedList)
	fmt.Println("Consensus used time : ", time.Since(startTime))
	time.Sleep(3 * time.Second)
}

func (p *HonestParty) WaitStateTransfer() {
	for {
		sttranMsg := <-p.GetMessage("StateTransfer", p.ViewSt)

		if len(p.sttranMsg) < int(p.QCsize) {
			sttran := network.Decapsulation("StateTransfer", sttranMsg).(*protobuf.StateTransfer)
			p.OnReceiveSt(sttran)
			if len(p.sttranMsg) >= int(p.QCsize) {
				return
			}
		}
	}
}

// Wait for History messages
func (p *HonestParty) WaitHistory(view uint32, com uint32, ctx context.Context, cancel context.CancelFunc) {
	for {
		select {
		case <-ctx.Done():
			return
		case historyMsg := <-p.GetMessage("History", view):
			if historyMsg.Com == com {
				history := network.Decapsulation("History", historyMsg).(*protobuf.History)
				p.OnReceiveHist(history)
				cancel()
				return
			}
		}
	}
}

// if receive a valid proposal return true
func (p *HonestParty) WaitProposal(ctx context.Context) bool {
	select {
	case <-ctx.Done():
		return false
	case proposalMsg := <-p.GetMessage("Proposal", p.View):
		if proposalMsg.Sender == p.GetLeader(p.View) && proposalMsg.Com == p.Com {
			proposal := network.Decapsulation("Proposal", proposalMsg).(*protobuf.Proposal)
			p.OnReceiveProposal(proposal)
			return true
		}
		return false
	}
}

func (p *HonestParty) WaitVrf(ctx context.Context, view uint32) {
	for {
		select {
		case <-ctx.Done():
			return
		case vrfMsg := <-p.GetMessage("Vrf", view):
			if vrfMsg.Com == p.Com {
				vrf := network.Decapsulation("Vrf", vrfMsg).(*protobuf.Vrf)
				p.OnReceiveVrf(vrf)
			}
		}
	}
}

// Handle the messages received
func (p *HonestParty) EventLoop(ctx context.Context, cancel context.CancelFunc) {
	for {
		select {
		case <-ctx.Done():
			return
		case newviewMsg := <-p.GetMessage("NewView", p.View-1):
			if p.PID == p.GetLeader(p.View) && newviewMsg.Com == p.Com && len(p.newView) < int(p.QCsize) {
				newview := network.Decapsulation("NewView", newviewMsg).(*protobuf.NewView)
				p.OnReceiveNewView(newview)
			}
		case preparevoteMsg := <-p.GetMessage("PrepareVote", p.View):
			if len(p.prepareVoteMsg) < int(p.QCsize) && p.CurComBool[preparevoteMsg.Sender] && preparevoteMsg.Com == p.Com && p.PID == p.GetLeader(p.View) {
				preparevote := network.Decapsulation("PrepareVote", preparevoteMsg).(*protobuf.PrepareVote)
				p.OnReceivePrepareVote(preparevote)
			}
		case precommitvoteMsg := <-p.GetMessage("PrecommitVote", p.View):
			if len(p.precommitVoteMsg) < int(p.QCsize) && p.CurComBool[precommitvoteMsg.Sender] && precommitvoteMsg.Com == p.Com && p.PID == p.GetLeader(p.View) {
				precommitvote := network.Decapsulation("PrecommitVote", precommitvoteMsg).(*protobuf.PrecommitVote)
				p.OnReceivePrecommitVote(precommitvote)
			}
		case commitvoteMsg := <-p.GetMessage("CommitVote", p.View):
			if len(p.commitVoteMsg) < int(p.QCsize) && p.CurComBool[commitvoteMsg.Sender] && commitvoteMsg.Com == p.Com && p.PID == p.GetLeader(p.View) {
				commitvote := network.Decapsulation("CommitVote", commitvoteMsg).(*protobuf.CommitVote)
				p.OnReceiveCommitVote(commitvote)
			}
			if len(p.commitVoteMsg) >= int(p.QCsize) {
				cancel()
				return
			}

		case prepareQCMsg := <-p.GetMessage("PrepareQCSend", p.View):
			if prepareQCMsg.Com == p.Com && prepareQCMsg.Sender == p.GetLeader(p.View) {
				prepareqcsend := network.Decapsulation("PrepareQCSend", prepareQCMsg).(*protobuf.PrepareQCSend)
				p.OnReceivePrepareQCSend(prepareqcsend)
			}
		case precommitQCMsg := <-p.GetMessage("PrecommitQCSend", p.View):
			if precommitQCMsg.Com == p.Com && precommitQCMsg.Sender == p.GetLeader(p.View) {
				precommitqcsend := network.Decapsulation("PrecommitQCSend", precommitQCMsg).(*protobuf.PrecommitQCSend)
				p.OnReceivePrecommitQCSend(precommitqcsend)
			}
		case commitQCMsg := <-p.GetMessage("CommitQCSend", p.View):
			if p.PID != p.GetLeader(p.View) && commitQCMsg.Com == p.Com && commitQCMsg.Sender == p.GetLeader(p.View) {
				commitqcsend := network.Decapsulation("CommitQCSend", commitQCMsg).(*protobuf.CommitQCSend)
				p.OnReceiveCommitQCSend(commitqcsend)
				cancel()
				return
			}

		}
	}
}

func (p *HonestParty) PrintNxtcom() {
	sort.Slice(p.nxtcom, func(i, j int) bool {
		if bytes.Compare(p.nxtcom[i].Rho, p.nxtcom[j].Rho) < 0 {
			return true
		} else {
			return false
		}
	})
	Ids := make([]uint32, 0, len(p.nxtcom))
	Rhos := make([]byte, 0, len(p.nxtcom))
	Views := make([]uint32, 0, len(p.nxtcom))
	for _, vrf := range p.nxtcom {
		Ids = append(Ids, vrf.Id)
		Rhos = append(Rhos, vrf.Rho[0])
		Views = append(Views, vrf.View)
	}
	fmt.Println("PID: ", p.PID, ", Ids: ", Ids, ", Rhos: ", Rhos, ", Views: ", Views)

}