/* Copyright (C) THL A29 Limited, a Tencent company. All rights reserved. SPDX-License-Identifier: Apache-2.0 */ // Package main 主程序入口 package main import ( "context" "flag" "fmt" "os" "sync" "time" "chainweaver.org.cn/chainweaver/ida/chain-service/v2/internal" "chainweaver.org.cn/chainweaver/ida/chain-service/v2/internal/chain" "chainweaver.org.cn/chainweaver/ida/chain-service/v2/internal/config" "chainweaver.org.cn/chainweaver/ida/chain-service/v2/internal/dao" "chainweaver.org.cn/chainweaver/ida/chain-service/v2/internal/sdk" "chainweaver.org.cn/chainweaver/ida/chain-service/v2/internal/server" "chainweaver.org.cn/chainweaver/ida/chain-service/v2/internal/svc" "chainweaver.org.cn/chainweaver/ida/chain-service/v2/internal/util" "chainweaver.org.cn/chainweaver/ida/chain-service/v2/pb/chainpb" commonChain "chainweaver.org.cn/chainweaver/ida/common/v2/chain" commonGrpc "chainweaver.org.cn/chainweaver/ida/common/v2/grpc" "github.com/zeromicro/go-zero/core/conf" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/service" "google.golang.org/grpc" "google.golang.org/grpc/reflection" ) var ( configFile = flag.String("f", "etc/chain-service.yaml", "the config file") // SubscribeFlag 全局订阅标记: chainInfoId + contractName -> true SubscribeFlag = sync.Map{} ) func main() { flag.Parse() // version 选项打印当前版本信息 args := flag.Args() if len(args) > 0 && args[0] == "version" { fmt.Println(internal.VersionInfo()) os.Exit(0) } var c config.Config conf.MustLoad(*configFile, &c) ctx := svc.NewServiceContext(c) register := func(grpcServer *grpc.Server) { chainpb.RegisterChainServer(grpcServer, server.NewChainServer(ctx)) // 在 dev、test 环境开启调试功能 if c.Mode == service.DevMode || c.Mode == service.TestMode { reflection.Register(grpcServer) } } s, err := commonGrpc.CreateGRPCServer(c.RpcServerConf, register, c.GrpcConf.CaCertFile, c.GrpcConf.ServerCertFile, c.GrpcConf.ServerKeyFile, c.GrpcConf.MaxRecvMsgSize, c.GrpcConf.MaxSendMsgSize) if err != nil { panic(fmt.Errorf("failed to CreateGRPCServer,error: %v", err)) } defer s.Stop() // 开启所有合约订阅 startSubscribe(ctx.SqlDB, c, &ctx.SDKClients) // 服务退出前释放所有的 sdk client defer func() { ctx.SDKClients.Range(func(key, value any) bool { // 获取 chainInfoId chainInfoId, ok := key.(int32) if !ok { logx.Errorf("key[ %v ] is not int32 in ctx.SDKClients", key) } // 获取 sdk client sdkClient, ok := value.(sdk.SCInterface) if !ok { logx.Errorf("value[ %v ] is not sdk.SCInterface in ctx.SDKClients", value) return true } // 停止 sdk client err = sdkClient.Stop() if err != nil { logx.Errorf("failed to stop sdkClient for chain %d before exit,err: %v", chainInfoId, err) } return true }) }() fmt.Printf("Starting rpc server at %s...\n", c.ListenOn) logx.Infof("Starting rpc server at %s...\n", c.ListenOn) s.Start() } func startSubscribe(db *dao.SqlDB, conf config.Config, sdkClients *sync.Map) { go func() { for { ctx := context.Background() // 获取所有链列表 chainList, err := db.ChainInfo.GetAllChains(ctx) if err != nil { logx.Errorf("failed to GetAllChains,err: %v", err) time.Sleep(time.Second * 3) continue } for _, ch := range chainList { // 获取合约列表 contractList, err2 := db.ContractInfo.GetAllContracts(ctx, ch.ID) if err2 != nil { logx.Errorf("failed to GetAllContracts,err: %v", err2) time.Sleep(time.Second * 3) continue } // 检查区块链节点是否已经配置 if len(ch.NodeConfs) == 0 { logx.Errorf("chain %s has not config nodeConfs", fmt.Sprintf("%s-%d", ch.ChainId, ch.ID)) time.Sleep(time.Second * 3) continue } // 转成订阅所需的区块链和节点配置结构 nodeCons := make([]commonChain.NodeConf, len(ch.NodeConfs)) for i, n := range ch.NodeConfs { nodeCons[i] = commonChain.NodeConf{ Url: n.GrpcUrl, EnableTls: n.EnableTls, TlsHostName: n.TlsHostName, CaCert: n.CaCertCipher, } } // 订阅多个合约事件 for _, c := range contractList { // 如果合约名称还没配置,则跳过 if c.ContractName == "" { continue } // 是否要订阅合约事件 if !c.EnableSubscribe { continue } // 检查该链是否已经初始化好 sdk client,如果没有则创建新的 sdkClient, err3 := util.CheckSDKClientConnection(db, sdkClients, int32(ch.ID), logx.WithContext(ctx), conf.Log.Path, conf.FtpConfig.ConfFilePath, conf.FtpConfig.Enable) if err3 != nil { logx.Errorf("failed to CheckSDKClientConnection for chain %s before subscribe contract %s event,err: %v", fmt.Sprintf("%s-%d", ch.ChainId, ch.ID), c.ContractName, err3) continue } go func(contractName, chainId, subChainId, chainMode, hashType, orgId, userTlsCert, userTlsKey string, chainInfoId int, conf config.Config, nodeCons []commonChain.NodeConf, sdkClient sdk.SCInterface) { // 检查是否重复订阅 val, ok := SubscribeFlag.Load(fmt.Sprintf("%d-%s", chainInfoId, contractName)) if ok && val.(bool) { logx.Debugf("[chain: %s-%d] [contractName: %s] already subscribed", chainId, chainInfoId, contractName) return } // 否则标记为已订阅 SubscribeFlag.Store(fmt.Sprintf("%d-%s", chainInfoId, contractName), true) // 开始订阅 err2 = chain.StartSubscribe(conf, chainId, subChainId, contractName, chainMode, hashType, orgId, userTlsCert, userTlsKey, nodeCons, chainInfoId, sdkClient) if err2 != nil { logx.Errorf("failed to subscribe chain %s contract %s event,err: %v", fmt.Sprintf("%s-%d", chainId, chainInfoId), contractName, err2) // 订阅失败,则取消标记 SubscribeFlag.Store(fmt.Sprintf("%d-%s", chainInfoId, contractName), false) } }(c.ContractName, ch.ChainId, ch.SubChainId, ch.ChainMode, ch.HashType, ch.OrgId, ch.UserTlsCertCipher, ch.UserTlsKeyCipher, int(ch.ID), conf, nodeCons, sdkClient) } } // 每隔3秒重新检查一下所有链的订阅 time.Sleep(time.Second * 3) } }() }