新注册的用户请输入邮箱并保存,随后登录邮箱激活账号。后续可直接使用邮箱登录!

Commit 51d8f3f2 authored by Allen Guan's avatar Allen Guan

Merge branch 'v1.1.0_dev' of...

Merge branch 'v1.1.0_dev' of https://git.code.tencent.com/ChainWeaver/mira/mira-mpc-engine into v1.1.0_dev
parents fd98cbc8 9b38ae1a
import uuid
import asset_platform
from file_ctl.builder import StorageBuilder
from py_sdk import PySdk
from pyspark.sql.functions import sum, count,max,min,avg
class Exp:
def __init__(self, sdk: PySdk):
self.sdk = sdk
def task_run(self, task):
self.sdk.logger.info("============计算方开始执行任务")
self.sdk.logger.info(f"task from scheduler: {str(task)}")
task_input = task.get("input").get("data")[0]
asset_en_name = task_input.get("assetName")
params = task.get("module").get("params")
self.sdk.logger.info("============计算方读取数据")
chain_info_id = task.get("chainInfoId")
mysql_url, mysql_prop, table_name = self.sdk.get_source_conn_info(asset_en_name,chain_info_id)
self.sdk.logger.info(f"asset_en_name: {asset_en_name}")
self.sdk.logger.info(f"mysql_url: {mysql_url}")
self.sdk.logger.info(f"mysql_prop: {mysql_prop}")
self.sdk.logger.info(f"table_name: {table_name}")
df = self.sdk.spark.read.jdbc(url=mysql_url, table="`" + table_name + "`", properties=mysql_prop)
# 根据表达式构造sql
func = params.get("function")
if func != "base":
self.sdk.logger.error(f"暂未支持{func}")
exp = params.get("expression")
constant = params.get("constant")
if constant:
c_list = constant.split(',')
else:
c_list = []
x_list = []
inputs = task.get("input").get("data")
for input in inputs:
asset_en_name = input["assetName"]
column_name = input.get("params").get("field")
table_name = input.get("dataID")
self.sdk.logger.info(f"asset_en_name: {asset_en_name}")
self.sdk.logger.info(f"table_name: {table_name}, column_name: {column_name}")
x_list.append(f"{table_name}.{column_name}")
expr = ""
x_cnt = 0
c_cnt = 0
for val in exp:
if val == "x":
expr = expr + x_list[x_cnt]
x_cnt = x_cnt + 1
elif val == "c":
expr = expr + c_list[c_cnt]
c_cnt = c_cnt + 1
else:
expr = expr + val
df.createOrReplaceTempView(asset_en_name)
sql = f"select {expr} from {asset_en_name}"
self.sdk.logger.info(f"execute sql: {sql}")
df = self.sdk.spark.sql(sql)
outputs = task.get("output").get("data")
for output in outputs:
if output.get("domainID") == self.sdk.party_id:
output_id = str(uuid.uuid4()).replace("-", "")
self.sdk.write_output_to_mysql(mysql_url, mysql_prop, output_id, df)
self.sdk.logger.info("============计算方完成计算,开始处理输出")
self.sdk.logger.info("输出数据名称:" + output_id)
data_name = output.get("dataName")
final_result = output.get("finalResult")
if final_result == "Y":
job_id = self.sdk.job_id
self.sdk.logger.info("============将结果保存为文件")
self.sdk.logger.info("文件名称:" + output_id)
t = df.collect()
self.sdk.save_to_local(output_id, str(t))
self.sdk.logger.info("============结果文件上传存储")
self.sdk.logger.info("存储桶名称:" + "result")
self.sdk.logger.info("存储对象名称:" + job_id + "/" + output_id)
self.sdk.logger.info("存储对象类型:" + "binary")
obj_storage = StorageBuilder(conf=self.sdk.object_storage).build()
obj_storage.fput_file(job_id + "/" + output_id, output_id)
self.sdk.logger.info("============存储成功")
self.sdk.logger.info("============更新输出信息上链")
self.sdk.update_output_id(output_id, data_name, final_result)
from exe_local import filter, join, query, agg
from exe_local import filter, join, query, agg, exp
from exe_psi import vole_psi
from exe_spdz import spdz
......@@ -40,6 +40,13 @@ class TaskRunner:
runner = agg.Agg(self.sdk)
runner.task_run(task)
def run_exp(self, task_status):
self.sdk.logger.info("start exp")
if task_status == "READY":
task = self.sdk.get_task_json()
runner = exp.Exp(self.sdk)
runner.task_run(task)
def run_psi(self, task_status):
self.sdk.logger.info("start psi")
task = self.sdk.get_task_json()
......@@ -75,6 +82,8 @@ class TaskRunner:
self.run_join(task_status)
elif task_type == "LOCALAGG":
self.run_agg(task_status)
elif task_type == "LOCALEXP":
self.run_exp(task_status)
elif task_type == "OTPSI":
self.run_psi(task_status)
elif task_type.startswith("MPC"):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment