新注册的用户请输入邮箱并保存,随后登录邮箱激活账号。后续可直接使用邮箱登录!

Commit a7885b2a authored by zan xiaopeng's avatar zan xiaopeng

fix psi result error

parent 51d8f3f2
......@@ -31,12 +31,14 @@ class VolePsi:
asset_en_name, table_name, column_name, index, datasource_no="", "", "", 0, ""
for input_data in task.get("input").get("data"):
domain_id = input_data.get("domainID")
self.sdk.logger.info(f"dominID {domain_id}, org_id {self.sdk.party_id}")
self.sdk.logger.info(f"dominID {domain_id}, party_id {self.sdk.party_id}")
if input_data.get("domainID") == self.sdk.party_id:
# asset_en_name = input_data.get("assetEnName")
asset_en_name = input_data.get("assetName")
# table_name = input_data.get("params").get("table")
column_name = input_data.get("params").get("field")
task_src = input_data.get("taskSrc")
mid_uuid_table = input_data.get("dataID")
if input_data.get("role") == "server":
index = 0
else:
......@@ -44,8 +46,18 @@ class VolePsi:
self.sdk.logger.info("计算节点编号:" + str(index))
self.sdk.logger.info("============计算方开始处理输入数据")
self.sdk.logger.info("资产名:" + asset_en_name)
# input_data_list = self.sdk.read_input_from_mysql(asset_en_name, column_name, chain_info_id)
mysql_url, mysql_prop, table_name = self.sdk.get_source_conn_info(asset_en_name, chain_info_id)
if task_src:
table_name = mid_uuid_table
df = self.spark.read.jdbc(url=mysql_url, table="`" + table_name + "`", properties=mysql_prop)
input_data_list = df.select(column_name).rdd.flatMap(lambda x: x).collect()
self.sdk.logger.info("表名:" + table_name)
self.sdk.logger.info("字段名:" + column_name)
input_data_list = self.sdk.read_input_from_mysql(asset_en_name, column_name, chain_info_id)
self.sdk.logger.info(f"sample data: ")
df.show()
file_name = task.get("jobInstanceID") + task.get("taskName")
self.sdk.logger.info(f"input data len: {len(input_data_list)}")
self.sdk.logger.info(f"data[0:100]: {str(input_data_list[0:100])}")
......@@ -79,10 +91,10 @@ class VolePsi:
self.sdk.logger.info(f"============部分求交结果: {str(res[0:100])}")
self.sdk.logger.info("============求交完成,开始处理输出")
mysql_url, mysql_prop, table_name = self.sdk.get_source_conn_info(asset_en_name, chain_info_id)
df = self.sdk.spark.read.jdbc(url=mysql_url, table="`" + table_name + "`", properties=mysql_prop)
self.sdk.logger.info(f"sample data: ")
df.show()
# mysql_url, mysql_prop, table_name = self.sdk.get_source_conn_info(asset_en_name, chain_info_id)
# df = self.sdk.spark.read.jdbc(url=mysql_url, table="`" + table_name + "`", properties=mysql_prop)
# self.sdk.logger.info(f"sample data: ")
# df.show()
outputs = task.get("output").get("data")
for output in outputs:
if output.get("domainID") == self.sdk.party_id:
......
......@@ -148,8 +148,9 @@ class PySdk():
# df = self.spark.read.jdbc(url=self.mysql_url, table="`" + table_name + "`", properties=self.mysql_prop)
# data = df.select(column_name).rdd.flatMap(lambda x: x).collect()
# return data
mysql_url, mysql_prop, table_name = self.get_source_conn_info(asset_en_name, chain_info_id)
self.logger.info(f"asset_en_name: {asset_en_name}")
self.logger.info(f"chain_info_id: {chain_info_id}")
mysql_url, mysql_prop, table_name = self.get_source_conn_info(asset_en_name, chain_info_id)
self.logger.info(f"mysql_url: {mysql_url}")
self.logger.info(f"mysql_prop: {mysql_prop}")
self.logger.info(f"table_name: {table_name}, column_name: {column_name}")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment