/* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved. SPDX-License-Identifier: Apache-2.0 */ // Package event - package event import "C" import ( "context" "fmt" "time" "chainmaker.org/chainmaker/pb-go/v2/common" chainCli "chainweaver.org.cn/chainweaver/ida/chain-service/v2/chain" "chainweaver.org.cn/chainweaver/ida/common/v2/chain" "chainweaver.org.cn/chainweaver/ida/common/v2/event" "chainweaver.org.cn/chainweaver/ida/registration-service/v2/internal/code" "chainweaver.org.cn/chainweaver/ida/registration-service/v2/internal/model" "chainweaver.org.cn/chainweaver/ida/registration-service/v2/internal/svc" "chainweaver.org.cn/chainweaver/ida/registration-service/v2/internal/utils" "github.com/Rican7/retry" "github.com/Rican7/retry/strategy" "github.com/zeromicro/go-zero/core/logx" "gorm.io/gorm" ) var ( // defaultGroupName 订阅组信息 defaultGroupName = "registration" // 注册中心 // consumer 消费者 // 通过配置不相同的consumer,支持负载 defaultConsumer = "registration_consumer" // retryChainListInterval 查询链列表的时间间隔 retryChainListInterval = time.Duration(3) * time.Second // retryContractAndPlatformInfoInterval listenChainEvent中查询合约信息和平台信息的时间间隔 retryContractAndPlatformInfoInterval = time.Duration(10) * time.Second ) // eventManager 事件管理器 // chainConfigManager 链配置管理器,每隔一段时间检查链配置变更 // redisClient 连接事件redis订阅 // configChangeC 链配置变更channel type eventManager struct { svcCtx *svc.ServiceContext eventCtx context.Context chainConfigManager *chainConfigManager redisClient *event.RedisClient groupName string configChangeC chan struct{} logx.Logger } // NewEventManager 实例化事件管理器 // //nolint:golint func NewEventManager(svcCtx *svc.ServiceContext) *eventManager { // 初始化时间redis客户端 addr := fmt.Sprintf("%s:%d", svcCtx.Config.EventConf.Host, svcCtx.Config.EventConf.Port) client, err := event.NewRedisClient(addr, "", svcCtx.Config.EventConf.Password) if err != nil { panic(err) } return &eventManager{ svcCtx: svcCtx, redisClient: client, groupName: defaultGroupName, configChangeC: make(chan struct{}, 1), Logger: logx.WithContext(context.Background()), } } // Process 事件处理器 // 从事件redis订阅中,subscribe合约事件 func (e *eventManager) Process() { e.Logger.Infof("[event] start process event") // 阻塞加载链服务配置信息 // 如果加载不出来,则3秒后重新尝试 // 直到正确获取链配置 func() { for { if e.svcCtx.ChainServiceClient == nil { e.Logger.Error("[event] chain client is nil") time.Sleep(retryChainListInterval) continue } // 初始化链配置管理器,并启动 e.chainConfigManager = NewChainConfigManager(e.svcCtx, e.configChangeC, e.Logger) go e.chainConfigManager.Run() return } }() // 从配置文件加载合约订阅组名称 if e.svcCtx.Config.EventConf.GroupName != "" { e.groupName = e.svcCtx.Config.EventConf.GroupName } e.Logger.Infof("start event: groupName[%s]", e.groupName) // 启动事件监听 var cancel context.CancelFunc e.eventCtx, cancel = context.WithCancel(context.Background()) go e.processEvent() //nolint:gosimple for { select { case <-e.configChangeC: // 收到配置变更信号,发送取消信号 cancel() // 等待旧的goroutine退出 <-e.eventCtx.Done() // 创建新的context和cancel e.eventCtx, cancel = context.WithCancel(context.Background()) // 启动新的事件处理 go e.processEvent() } } } // processEvent 启动事件监听 func (e *eventManager) processEvent() { if len(e.chainConfigManager.chainConfig) == 0 { panic("chain config is empty") } // 变量链配置,对每一个链启动独立的事件监听 for _, chainConfig := range e.chainConfigManager.chainConfig { go e.listenChainEvent(e.eventCtx, chainConfig) } } // listenChainEvent 监听每一个链自己的事件 func (e *eventManager) listenChainEvent(ctx context.Context, chainConfig *chainCli.ChainInfoData) { e.Logger.Infof("[event] start listen chain event, chaininfoid:[%d]", chainConfig.Id) // 查询当前链的合约信息 req := &chainCli.GetContractInfoRequest{ RequestId: "event-request-id", ChainInfoId: chainConfig.Id, ContractType: chain.ContractTypeIda, } var ( contractInfo *chainCli.GetContractInfoResponse err error ) // 尝试获取合约信息,如果失败则重试,每隔10s重试一次。 err = retry.Retry(func(uint) error { contractInfo, err = e.chainConfigManager.srvCtx.ChainServiceClient.GetContractInfo(context.Background(), req) if err != nil || contractInfo.Code != int32(code.Success) || contractInfo.Data == nil || len(contractInfo.Data.Name) == 0 { e.Logger.Errorf("[event] get contract info failed from chain service, err:%s", err) return fmt.Errorf("GetContractInfo err:%v", err) } return nil }, strategy.Wait(retryContractAndPlatformInfoInterval), ) if err != nil { e.Logger.Errorf("[event] get contract info failed from chain service, err:%s", err) return } // 设置当前链的合约 //code.ContactNameLocker.Lock() //defer code.ContactNameLocker.Unlock() //code.ContractName[chainConfig.Id] = contractInfo.Data.Name //e.Logger.Infof("[event] set contract name[%s],code.ContractName[%v] ", contractInfo.Data.Name, code.ContractName) // 查询当前链的平台信息, 如果失败则重试,每隔10s重试一次 err = retry.Retry(func(uint) error { if err = e.initChainPlatform(chainConfig.Id); err != nil { e.Logger.Errorf("[event] init chain platform err:%s", err) return err } return nil }, strategy.Wait(retryContractAndPlatformInfoInterval), ) if err != nil { e.Logger.Errorf("[event] init chain platform failed, err:%s", err) return } // eventHandlers 事件处理器集合 eventHandlers := []handler{ // 资产创建事件处理器 NewAssetCreateEventHandler(e.svcCtx.GDB, e.svcCtx, e.Logger, chainConfig.Id), // 资产更新事件处理器 NewAssetUpdateEventHandler(e.svcCtx.GDB, e.svcCtx, e.Logger, chainConfig.Id), // 资产删除事件处理器 NewAssetDeleteEventHandler(e.svcCtx.GDB, e.Logger, chainConfig.Id), // 认证处理事件处理器 NewCertificateEventHandler(e.svcCtx.GDB, e.Logger, chainConfig.Id), // 认证申请事件处理器 NewCertificateAppliedEventHandler(e.svcCtx.GDB, e.Logger, chainConfig.Id), // 新企业创建事件处理器 NewUserCreatedEventHandler(e.svcCtx.GDB, e.Logger, chainConfig.Id), // 企业用户修改事件处理器 NewUserModifyEventHandler(e.svcCtx.GDB, e.Logger, chainConfig.Id), // 平台注册事件处理器 NewPlatformRegisterHandler(e.svcCtx.GDB, e.Logger, chainConfig.Id), } dispatcher := NewHandlerDispatcher(chainConfig.Id, eventHandlers, e.Logger) // 异步协程订阅事件 go func() { e.Logger.Infof("[event] subscribe redis, ChainId:%s, SubChainId:%s, contractName:%s", chainConfig.ChainId, chainConfig.SubChainId, contractInfo.Data.Name) err = e.redisClient.SubscribeToStream(ctx, chainConfig.ChainId, chainConfig.SubChainId, contractInfo.Data.Name, e.groupName, defaultConsumer, int(chainConfig.Id), dispatcher.dispatchTopicHandler) logx.WithContext(ctx).Errorf("subscribe to steam error: %s", err) }() } // NewHandlerDispatcher 实例化handler分发器 // //revive:ignore func NewHandlerDispatcher(chainInfoId int32, handlers []handler, logger logx.Logger) *handlerDispatcher { handlersMap := make(map[string]handler, len(handlers)) for _, h := range handlers { handlersMap[h.eventName()] = h } return &handlerDispatcher{ chainInfoId: chainInfoId, handlersMap: handlersMap, Logger: logger, } } // handlerDispatcher 事件分发器 type handlerDispatcher struct { chainInfoId int32 handlersMap map[string]handler logx.Logger } // dispatchTopicHandler 事件分发 func (h *handlerDispatcher) dispatchTopicHandler(eventInfo *common.ContractEventInfo) { //nolint:golint h.Logger.Infof("[event] dispatch event, %s", eventInfo) if eventInfo == nil { return } if eventHandler, exist := h.handlersMap[eventInfo.Topic]; exist { eventHandler.handleEvent(eventInfo) } } // 事件处理器接口定义 // consumer()string: 返回消费者名称,也就是事件名称 // handleEvent(*common.ContractEventInfo) error 事件处理 type handler interface { eventName() string handleEvent(*common.ContractEventInfo) } // initChainPlatform 初始化当前平台信息 func (e *eventManager) initChainPlatform(chainInfoId int32) error { platform, err := utils.GetPlatformInfo(e.svcCtx.ChainServiceClient, chainInfoId) if err != nil { e.Logger.Errorf("[event] get current chainInfoId[%d] platform info err:%s", chainInfoId, err) return err } // 查询是否已经存在 err = e.svcCtx.GDB.Where("chain_info_id = ?", chainInfoId).First(&model.ChainPlatform{}).Error if err == gorm.ErrRecordNotFound { chainPlatform := &model.ChainPlatform{ ChainInfoId: chainInfoId, PlatformId: int32(platform.PlatformId), CreatedAt: time.Now(), } if err = e.svcCtx.GDB.Create(chainPlatform).Error; err != nil { return err } } return nil }