#!/usr/bin/env python3# -*- coding: utf-8 -*-## Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#fromabcimportABCMetafromabcimportabstractmethodfromtypingimportAnyfromtypingimportListfromtypingimportOptionalfromtypingimportUnionfromgs_interactive.apiimportAdminServiceGraphManagementApifromgs_interactive.apiimportAdminServiceJobManagementApifromgs_interactive.apiimportAdminServiceProcedureManagementApifromgs_interactive.apiimportAdminServiceServiceManagementApifromgs_interactive.apiimportGraphServiceEdgeManagementApifromgs_interactive.apiimportGraphServiceVertexManagementApifromgs_interactive.apiimportQueryServiceApifromgs_interactive.apiimportUtilsApifromgs_interactive.api_clientimportApiClientfromgs_interactive.configurationimportConfigurationfrompydanticimportFieldfrompydanticimportStrictBytesfrompydanticimportStrictStrfromtyping_extensionsimportAnnotatedfromgs_interactive.client.generated.results_pb2importCollectiveResultsfromgs_interactive.client.resultimportResultfromgs_interactive.client.statusimportStatusfromgs_interactive.client.statusimportStatusCodefromgs_interactive.client.utilsimportInputFormatfromgs_interactive.client.utilsimportappend_format_bytefromgs_interactive.modelsimportCreateGraphRequestfromgs_interactive.modelsimportCreateGraphResponsefromgs_interactive.modelsimportCreateProcedureRequestfromgs_interactive.modelsimportCreateProcedureResponsefromgs_interactive.modelsimportEdgeRequestfromgs_interactive.modelsimportGetGraphResponsefromgs_interactive.modelsimportGetGraphSchemaResponsefromgs_interactive.modelsimportGetGraphStatisticsResponsefromgs_interactive.modelsimportGetProcedureResponsefromgs_interactive.modelsimportJobResponsefromgs_interactive.modelsimportJobStatusfromgs_interactive.modelsimportQueryRequestfromgs_interactive.modelsimportSchemaMappingfromgs_interactive.modelsimportServiceStatusfromgs_interactive.modelsimportStartServiceRequestfromgs_interactive.modelsimportStopServiceRequestfromgs_interactive.modelsimportUpdateProcedureRequestfromgs_interactive.modelsimportUploadFileResponsefromgs_interactive.modelsimportVertexDatafromgs_interactive.modelsimportVertexEdgeRequestfromgs_interactive.modelsimportVertexRequestclassEdgeInterface(metaclass=ABCMeta):@abstractmethoddefadd_edge(self,graph_id:StrictStr,edge_request:List[EdgeRequest])->Result[str]:raiseNotImplementedError@abstractmethoddefdelete_edge(self,graph_id:StrictStr,src_label:Annotated[StrictStr,Field(description="The label name of src vertex.")],src_primary_key_value:Annotated[Any,Field(description="The primary key value of src vertex.")],dst_label:Annotated[StrictStr,Field(description="The label name of dst vertex.")],dst_primary_key_value:Annotated[Any,Field(description="The primary key value of dst vertex.")],)->Result[str]:raiseNotImplementedError@abstractmethoddefget_edge(self,graph_id:StrictStr,edge_label:Annotated[StrictStr,Field(description="The label name of edge.")],src_label:Annotated[StrictStr,Field(description="The label name of src vertex.")],src_primary_key_value:Annotated[Any,Field(description="The primary key value of src vertex.")],dst_label:Annotated[StrictStr,Field(description="The label name of dst vertex.")],dst_primary_key_value:Annotated[Any,Field(description="The primary key value of dst vertex.")],)->Result[Union[None,EdgeRequest]]:raiseNotImplementedError@abstractmethoddefupdate_edge(self,graph_id:StrictStr,edge_request:EdgeRequest)->Result[str]:raiseNotImplementedErrorclassVertexInterface(metaclass=ABCMeta):@abstractmethoddefadd_vertex(self,graph_id:StrictStr,vertex_edge_request:VertexEdgeRequest,)->Result[StrictStr]:raiseNotImplementedError@abstractmethoddefdelete_vertex(self,graph_id:StrictStr,label:Annotated[StrictStr,Field(description="The label name of vertex.")],primary_key_value:Annotated[Any,Field(description="The primary key value of vertex.")],)->Result[str]:raiseNotImplementedError@abstractmethoddefget_vertex(self,graph_id:StrictStr,label:Annotated[StrictStr,Field(description="The label name of vertex.")],primary_key_value:Annotated[Any,Field(description="The primary key value of vertex.")],)->Result[VertexData]:raiseNotImplementedError@abstractmethoddefupdate_vertex(self,graph_id:StrictStr,vertex_request:VertexRequest)->Result[str]:raiseNotImplementedErrorclassGraphInterface(metaclass=ABCMeta):@abstractmethoddefcreate_graph(self,graph:CreateGraphRequest)->Result[CreateGraphResponse]:raiseNotImplementedError@abstractmethoddefget_graph_schema(graph_id:Annotated[StrictStr,Field(description="The id of graph to get")],)->Result[GetGraphSchemaResponse]:raiseNotImplementedError@abstractmethoddefget_graph_meta(graph_id:Annotated[StrictStr,Field(description="The id of graph to get")],)->Result[GetGraphResponse]:raiseNotImplementedError@abstractmethoddefget_graph_statistics(graph_id:Annotated[StrictStr,Field(description="The id of graph to get")],)->Result[GetGraphStatisticsResponse]:raiseNotImplementedError@abstractmethoddefdelete_graph(graph_id:Annotated[StrictStr,Field(description="The id of graph to delete")],)->Result[str]:raiseNotImplementedError@abstractmethoddeflist_graphs(self)->Result[List[GetGraphResponse]]:raiseNotImplementedError@abstractmethoddefbulk_loading(self,graph_id:Annotated[StrictStr,Field(description="The id of graph to load")],schema_mapping:SchemaMapping,)->Result[JobResponse]:raiseNotImplementedErrorclassProcedureInterface(metaclass=ABCMeta):@abstractmethoddefcreate_procedure(self,graph_id:StrictStr,procedure:CreateProcedureRequest)->Result[CreateProcedureResponse]:raiseNotImplementedError@abstractmethoddefdelete_procedure(self,graph_id:StrictStr,procedure_id:StrictStr)->Result[str]:raiseNotImplementedError@abstractmethoddeflist_procedures(self,graph_id:StrictStr)->Result[List[GetProcedureResponse]]:raiseNotImplementedError@abstractmethoddefupdate_procedure(self,graph_id:StrictStr,proc_id:StrictStr,procedure:UpdateProcedureRequest)->Result[str]:raiseNotImplementedError@abstractmethoddefget_procedure(self,graph_id:StrictStr,procedure_id:StrictStr)->Result[GetProcedureResponse]:raiseNotImplementedError@abstractmethoddefcall_procedure(self,graph_id:StrictStr,params:QueryRequest)->Result[CollectiveResults]:raiseNotImplementedError@abstractmethoddefcall_procedure_current(self,params:QueryRequest)->Result[CollectiveResults]:raiseNotImplementedError@abstractmethoddefcall_procedure_raw(self,graph_id:StrictStr,params:bytes)->Result[str]:raiseNotImplementedError@abstractmethoddefcall_procedure_current_raw(self,params:bytes)->Result[str]:raiseNotImplementedErrorclassQueryServiceInterface:@abstractmethoddefget_service_status(self)->Result[ServiceStatus]:raiseNotImplementedError@abstractmethoddefstart_service(self,start_service_request:Annotated[Optional[StartServiceRequest],Field(description="Start service on a specified graph"),]=None,)->Result[str]:raiseNotImplementedError@abstractmethoddefstop_service(self,graph_id:str)->Result[str]:raiseNotImplementedError@abstractmethoddefrestart_service(self)->Result[str]:raiseNotImplementedErrorclassJobInterface(metaclass=ABCMeta):@abstractmethoddefget_job(self,job_id:StrictStr)->Result[JobStatus]:raiseNotImplementedError@abstractmethoddeflist_jobs(self)->Result[List[JobResponse]]:raiseNotImplementedError@abstractmethoddefcancel_job(self,job_id:StrictStr)->Result[str]:raiseNotImplementedErrorclassUiltsInterface(metaclass=ABCMeta):@abstractmethoddefupload_file(self,filestorage:Optional[Union[StrictBytes,StrictStr]])->Result[UploadFileResponse]:raiseNotImplementedError
[docs]classDefaultSession(Session):""" The default session implementation for Interactive SDK. It provides the implementation of all service APIs. """
[docs]def__init__(self,admin_uri:str,stored_proc_uri:str=None):""" Construct a new session using the specified admin_uri and stored_proc_uri. Args: admin_uri (str): the uri for the admin service. stored_proc_uri (str, optional): the uri for the stored procedure service. If not provided,the uri will be read from the service status. """self._client=ApiClient(Configuration(host=admin_uri))self._graph_api=AdminServiceGraphManagementApi(self._client)self._job_api=AdminServiceJobManagementApi(self._client)self._procedure_api=AdminServiceProcedureManagementApi(self._client)self._service_api=AdminServiceServiceManagementApi(self._client)self._utils_api=UtilsApi(self._client)ifstored_proc_uriisNone:service_status=self.get_service_status()ifnotservice_status.is_ok():raiseException("Failed to get service status: ",service_status.get_status_message(),)service_port=service_status.get_value().hqps_port# replace the port in urisplitted=admin_uri.split(":")splitted[-1]=str(service_port)stored_proc_uri=":".join(splitted)self._query_client=ApiClient(Configuration(host=stored_proc_uri))self._query_api=QueryServiceApi(self._query_client)self._edge_api=GraphServiceEdgeManagementApi(self._query_client)self._vertex_api=GraphServiceVertexManagementApi(self._query_client)
def__enter__(self):self._client.__enter__()returnselfdef__exit__(self,exc_type,exc_val,exc_tb):self._client.__exit__(exc_type=exc_type,exc_value=exc_val,traceback=exc_tb)# implementations of the methods from the interfaces
[docs]defadd_vertex(self,graph_id:StrictStr,vertex_edge_request:VertexEdgeRequest,)->Result[StrictStr]:""" Add a vertex to the specified graph. """graph_id=self.ensure_param_str("graph_id",graph_id)try:api_response=self._vertex_api.add_vertex_with_http_info(graph_id,vertex_edge_request)returnResult.from_response(api_response)exceptExceptionase:returnResult.from_exception(e)
defdelete_vertex(self,graph_id:StrictStr,label:Annotated[StrictStr,Field(description="The label name of vertex.")],primary_key_value:Annotated[Any,Field(description="The primary key value of vertex.")],)->Result[str]:raiseNotImplementedError
[docs]defget_vertex(self,graph_id:StrictStr,label:Annotated[StrictStr,Field(description="The label name of vertex.")],primary_key_value:Annotated[Any,Field(description="The primary key value of vertex.")],)->Result[VertexData]:""" Get a vertex from the specified graph with primary key value. """graph_id=self.ensure_param_str("graph_id",graph_id)try:api_response=self._vertex_api.get_vertex_with_http_info(graph_id,label,primary_key_value)returnResult.from_response(api_response)exceptExceptionase:returnResult.from_exception(e)
[docs]defupdate_vertex(self,graph_id:StrictStr,vertex_request:VertexRequest)->Result[str]:""" Update a vertex in the specified graph. """graph_id=self.ensure_param_str("graph_id",graph_id)try:api_response=self._vertex_api.update_vertex_with_http_info(graph_id,vertex_request)returnResult.from_response(api_response)exceptExceptionase:returnResult.from_exception(e)
[docs]defadd_edge(self,graph_id:StrictStr,edge_request:List[EdgeRequest])->Result[str]:""" Add an edge to the specified graph. """graph_id=self.ensure_param_str("graph_id",graph_id)try:api_response=self._edge_api.add_edge_with_http_info(graph_id,edge_request)returnResult.from_response(api_response)exceptExceptionase:returnResult.from_exception(e)
defdelete_edge(self,graph_id:StrictStr,src_label:Annotated[StrictStr,Field(description="The label name of src vertex.")],src_primary_key_value:Annotated[Any,Field(description="The primary key value of src vertex.")],dst_label:Annotated[StrictStr,Field(description="The label name of dst vertex.")],dst_primary_key_value:Annotated[Any,Field(description="The primary key value of dst vertex.")],)->Result[str]:raiseNotImplementedError
[docs]defget_edge(self,graph_id:StrictStr,edge_label:Annotated[StrictStr,Field(description="The label name of edge.")],src_label:Annotated[StrictStr,Field(description="The label name of src vertex.")],src_primary_key_value:Annotated[Any,Field(description="The primary key value of src vertex.")],dst_label:Annotated[StrictStr,Field(description="The label name of dst vertex.")],dst_primary_key_value:Annotated[Any,Field(description="The primary key value of dst vertex.")],)->Result[Union[None,EdgeRequest]]:""" Get an edge from the specified graph with primary key value. """graph_id=self.ensure_param_str("graph_id",graph_id)try:api_response=self._edge_api.get_edge_with_http_info(graph_id,edge_label,src_label,src_primary_key_value,dst_label,dst_primary_key_value,)returnResult.from_response(api_response)exceptExceptionase:returnResult.from_exception(e)
[docs]defupdate_edge(self,graph_id:StrictStr,edge_request:EdgeRequest)->Result[str]:""" Update an edge in the specified graph. """graph_id=self.ensure_param_str("graph_id",graph_id)try:api_response=self._edge_api.update_edge_with_http_info(graph_id,edge_request)returnResult.from_response(api_response)exceptExceptionase:returnResult.from_exception(e)
[docs]defcreate_graph(self,graph:CreateGraphRequest)->Result[CreateGraphResponse]:""" Create a new graph with the specified graph request. """try:response=self._graph_api.create_graph_with_http_info(graph)returnResult.from_response(response)exceptExceptionase:returnResult.from_exception(e)
[docs]defget_graph_schema(self,graph_id:Annotated[StrictStr,Field(description="The id of graph to get")],)->Result[GetGraphSchemaResponse]:"""Get the schema of a specified graph. Parameters: graph_id (str): The ID of the graph whose schema is to be retrieved. Returns: Result[GetGraphSchemaResponse]: The result containing the schema of the specified graph. """graph_id=self.ensure_param_str("graph_id",graph_id)try:response=self._graph_api.get_schema_with_http_info(graph_id)returnResult.from_response(response)exceptExceptionase:returnResult.from_exception(e)
[docs]defget_graph_meta(self,graph_id:Annotated[StrictStr,Field(description="The id of graph to get")],)->Result[GetGraphResponse]:""" Get the meta information of a specified graph. """graph_id=self.ensure_param_str("graph_id",graph_id)try:response=self._graph_api.get_graph_with_http_info(graph_id)returnResult.from_response(response)exceptExceptionase:returnResult.from_exception(e)
[docs]defget_graph_statistics(self,graph_id:Annotated[StrictStr,Field(description="The id of graph to get")],)->Result[GetGraphStatisticsResponse]:""" Get the statistics of a specified graph. """graph_id=self.ensure_param_str("graph_id",graph_id)try:response=self._graph_api.get_graph_statistic_with_http_info(graph_id)returnResult.from_response(response)exceptExceptionase:returnResult.from_exception(e)
[docs]defdelete_graph(self,graph_id:Annotated[StrictStr,Field(description="The id of graph to delete")],)->Result[str]:""" Delete a graph with the specified graph id. """graph_id=self.ensure_param_str("graph_id",graph_id)try:response=self._graph_api.delete_graph_with_http_info(graph_id)returnResult.from_response(response)exceptExceptionase:returnResult.from_exception(e)
[docs]deflist_graphs(self)->Result[List[GetGraphResponse]]:""" List all graphs. """try:response=self._graph_api.list_graphs_with_http_info()returnResult.from_response(response)exceptExceptionase:returnResult.from_exception(e)
[docs]defbulk_loading(self,graph_id:Annotated[StrictStr,Field(description="The id of graph to load")],schema_mapping:SchemaMapping,)->Result[JobResponse]:""" Submit a bulk loading job to the specified graph. """graph_id=self.ensure_param_str("graph_id",graph_id)# First try to upload the input files if they are specified with a starting @# return a new schema_mapping with the uploaded filesupload_res=self.try_upload_files(schema_mapping)ifnotupload_res.is_ok():returnupload_resschema_mapping=upload_res.get_value()print("new schema_mapping: ",schema_mapping)try:response=self._graph_api.create_dataloading_job_with_http_info(graph_id,schema_mapping)returnResult.from_response(response)exceptExceptionase:returnResult.from_exception(e)
[docs]defcreate_procedure(self,graph_id:StrictStr,procedure:CreateProcedureRequest)->Result[CreateProcedureResponse]:""" Create a new procedure in the specified graph. """graph_id=self.ensure_param_str("graph_id",graph_id)try:response=self._procedure_api.create_procedure_with_http_info(graph_id,procedure)returnResult.from_response(response)exceptExceptionase:returnResult.from_exception(e)
[docs]defdelete_procedure(self,graph_id:StrictStr,procedure_id:StrictStr)->Result[str]:""" Delete a procedure in the specified graph. """graph_id=self.ensure_param_str("graph_id",graph_id)procedure_id=self.ensure_param_str("procedure_id",procedure_id)try:response=self._procedure_api.delete_procedure_with_http_info(graph_id,procedure_id)returnResult.from_response(response)exceptExceptionase:returnResult.from_exception(e)
[docs]deflist_procedures(self,graph_id:StrictStr)->Result[List[GetProcedureResponse]]:""" List all procedures in the specified graph. """graph_id=self.ensure_param_str("graph_id",graph_id)try:response=self._procedure_api.list_procedures_with_http_info(graph_id)returnResult.from_response(response)exceptExceptionase:returnResult.from_exception(e)
[docs]defupdate_procedure(self,graph_id:StrictStr,proc_id:StrictStr,procedure:UpdateProcedureRequest)->Result[str]:""" Update a procedure in the specified graph. """graph_id=self.ensure_param_str("graph_id",graph_id)try:response=self._procedure_api.update_procedure_with_http_info(graph_id,proc_id,procedure)returnResult.from_response(response)exceptExceptionase:returnResult.from_exception(e)
[docs]defget_procedure(self,graph_id:StrictStr,procedure_id:StrictStr)->Result[GetProcedureResponse]:""" Get a procedure in the specified graph. """graph_id=self.ensure_param_str("graph_id",graph_id)try:response=self._procedure_api.get_procedure_with_http_info(graph_id,procedure_id)returnResult.from_response(response)exceptExceptionase:returnResult.from_exception(e)
[docs]defcall_procedure(self,graph_id:StrictStr,params:QueryRequest)->Result[CollectiveResults]:""" Call a procedure in the specified graph. """graph_id=self.ensure_param_str("graph_id",graph_id)try:# gs_interactive currently support four type of inputformat,# see flex/engines/graph_db/graph_db_session.h# Here we add byte of value 1 to denote the input format is in json formatresponse=self._query_api.call_proc_with_http_info(graph_id=graph_id,body=append_format_byte(params.to_json().encode(),InputFormat.CYPHER_JSON),)result=CollectiveResults()ifresponse.status_code==200:result.ParseFromString(response.data)returnResult.ok(result)else:returnResult(Status.from_response(response),result)exceptExceptionase:returnResult.from_exception(e)
[docs]defcall_procedure_current(self,params:QueryRequest)->Result[CollectiveResults]:""" Call a procedure in the current graph. """try:# gs_interactive currently support four type of inputformat,# see flex/engines/graph_db/graph_db_session.h# Here we add byte of value 1 to denote the input format is in json formatresponse=self._query_api.call_proc_current_with_http_info(body=append_format_byte(params.to_json().encode(),InputFormat.CYPHER_JSON))result=CollectiveResults()ifresponse.status_code==200:result.ParseFromString(response.data)returnResult.ok(result)else:returnResult(Status.from_response(response),result)exceptExceptionase:returnResult.from_exception(e)
[docs]defcall_procedure_raw(self,graph_id:StrictStr,params:bytes)->Result[str]:""" Call a procedure in the specified graph with raw bytes. """graph_id=self.ensure_param_str("graph_id",graph_id)try:# gs_interactive currently support four type of inputformat,# see flex/engines/graph_db/graph_db_session.h# Here we add byte of value 1 to denote the input format is in encoder/decoder formatresponse=self._query_api.call_proc_with_http_info(graph_id=graph_id,body=append_format_byte(params,InputFormat.CPP_ENCODER),)returnResult.from_response(response)exceptExceptionase:returnResult.from_exception(e)
[docs]defcall_procedure_current_raw(self,params:bytes)->Result[str]:""" Call a procedure in the current graph with raw bytes. """try:# gs_interactive currently support four type of inputformat,# see flex/engines/graph_db/graph_db_session.h# Here we add byte of value 1 to denote the input format is in encoder/decoder formatresponse=self._query_api.call_proc_current_with_http_info(body=append_format_byte(params,InputFormat.CPP_ENCODER))returnResult.from_response(response)exceptExceptionase:returnResult.from_exception(e)
[docs]defget_service_status(self)->Result[ServiceStatus]:""" Get the status of the service. """try:response=self._service_api.get_service_status_with_http_info()returnResult.from_response(response)exceptExceptionase:returnResult.from_exception(e)
[docs]defstart_service(self,start_service_request:Annotated[Optional[StartServiceRequest],Field(description="Start service on a specified graph"),]=None,)->Result[str]:""" Start the service on a specified graph. """try:response=self._service_api.start_service_with_http_info(start_service_request)returnResult.from_response(response)exceptExceptionase:returnResult.from_exception(e)
[docs]defstop_service(self,graph_id:str=None)->Result[str]:""" Stop the service. """try:req=StopServiceRequest()ifgraph_id:req.graph_id=graph_idresponse=self._service_api.stop_service_with_http_info(req)returnResult.from_response(response)exceptExceptionase:returnResult.from_exception(e)
[docs]defrestart_service(self)->Result[str]:""" Restart the service. """try:response=self._service_api.restart_service_with_http_info()returnResult.from_response(response)exceptExceptionase:returnResult.from_exception(e)
[docs]defget_job(self,job_id:StrictStr)->Result[JobStatus]:""" Get the status of a job with the specified job id. """job_id=self.ensure_param_str("job_id",job_id)try:response=self._job_api.get_job_by_id_with_http_info(job_id)returnResult.from_response(response)exceptExceptionase:returnResult.from_exception(e)
[docs]deflist_jobs(self)->Result[List[JobResponse]]:""" List all jobs. """try:response=self._job_api.list_jobs_with_http_info()returnResult.from_response(response)exceptExceptionase:returnResult.from_exception(e)
[docs]defcancel_job(self,job_id:StrictStr)->Result[str]:""" Cancel a job with the specified job id. """job_id=self.ensure_param_str("job_id",job_id)try:response=self._job_api.delete_job_by_id_with_http_info(job_id)returnResult.from_response(response)exceptExceptionase:returnResult.from_exception(e)
[docs]defupload_file(self,filestorage:Optional[Union[StrictBytes,StrictStr]])->Result[UploadFileResponse]:""" Upload a file to the server. """try:print("uploading file: ",filestorage)response=self._utils_api.upload_file_with_http_info(filestorage)print("response: ",response)ifresponse.status_code==200:# the response is the path of the uploaded file on server.returnResult.from_response(response)else:print("Failed to upload file: ",input)returnResult.from_response(response)exceptExceptionase:print("got exception: ",e)returnResult.from_exception(e)
deftrim_path(self,path:str)->str:returnpath[1:]ifpath.startswith("@")elsepathdefpreprocess_inputs(self,location:str,inputs:List[str],schema_mapping:SchemaMapping):root_dir_marked_with_at=Falseiflocationandlocation.startswith("@"):root_dir_marked_with_at=Truenew_inputs=[]fori,inputinenumerate(inputs):# First check whether input is validiflocationandnotroot_dir_marked_with_at:ifinput.startswith("@"):print("Root location given without @, but the input file starts with @"+input+", index: "+str(i),)returnResult.error(Status(StatusCode.BAD_REQUEST,"Root location given without @, but the input file starts with @"+input+", index: "+str(i),),new_inputs,)iflocation:new_inputs.append(location+"/"+self.trim_path(input))else:new_inputs.append(input)returnResult.ok(new_inputs)defcheck_file_mixup(self,schema_mapping:SchemaMapping)->Result[SchemaMapping]:location=Noneifschema_mapping.loading_configandschema_mapping.loading_config.data_source:ifschema_mapping.loading_config.data_source.scheme!="file":print("Only check mixup for file scheme")returnResult.ok(schema_mapping)location=schema_mapping.loading_config.data_source.locationextracted_files=[]ifschema_mapping.vertex_mappings:forvertex_mappinginschema_mapping.vertex_mappings:ifvertex_mapping.inputs:preprocess_result=self.preprocess_inputs(location,vertex_mapping.inputs,schema_mapping)ifnotpreprocess_result.is_ok():returnResult.error(preprocess_result.status,schema_mapping)vertex_mapping.inputs=preprocess_result.get_value()extracted_files.extend(vertex_mapping.inputs)ifschema_mapping.edge_mappings:foredge_mappinginschema_mapping.edge_mappings:ifedge_mapping.inputs:preprocess_result=self.preprocess_inputs(location,edge_mapping.inputs,schema_mapping)ifnotpreprocess_result.is_ok():returnResult.error(preprocess_result.status,schema_mapping)edge_mapping.inputs=preprocess_result.get_value()extracted_files.extend(edge_mapping.inputs)ifextracted_files:# count the number of files start with @count=0forfileinextracted_files:iffile.startswith("@"):count+=1ifcount==0:print("No file to upload")returnResult.ok(schema_mapping)elifcount!=len(extracted_files):print("Can not mix uploading file and not uploading file")returnResult.error("Can not mix uploading file and not uploading file")returnResult.ok(schema_mapping)
[docs]defupload_and_replace_input_inplace(self,schema_mapping:SchemaMapping)->Result[SchemaMapping]:""" For each input file in schema_mapping, if the file starts with @, upload the file to the server, and replace the path with the path returned from the server. """ifschema_mapping.vertex_mappings:forvertex_mappinginschema_mapping.vertex_mappings:ifvertex_mapping.inputs:fori,inputinenumerate(vertex_mapping.inputs):ifinput.startswith("@"):res=self.upload_file(input[1:])ifnotres.is_ok():returnResult.error(res.status,schema_mapping)vertex_mapping.inputs[i]=res.get_value().file_pathifschema_mapping.edge_mappings:foredge_mappinginschema_mapping.edge_mappings:ifedge_mapping.inputs:fori,inputinenumerate(edge_mapping.inputs):ifinput.startswith("@"):res=self.upload_file(input[1:])ifnotres.is_ok():returnResult.error(res.status,schema_mapping)edge_mapping.inputs[i]=res.get_value().file_pathreturnResult.ok(schema_mapping)
[docs]deftry_upload_files(self,schema_mapping:SchemaMapping)->Result[SchemaMapping]:""" Try to upload the input files if they are specified with a starting @ for input files in schema_mapping. Replace the path to the uploaded file with the path returned from the server. The @ can be added to the beginning of data_source.location in schema_mapping.loading_config,or added to each file in vertex_mappings and edge_mappings. 1. location: @/path/to/dir inputs: - @/path/to/file1 - @/path/to/file2 2. location: /path/to/dir inputs: - @/path/to/file1 - @/path/to/file2 3. location: @/path/to/dir inputs: - /path/to/file1 - /path/to/file2 4. location: /path/to/dir inputs: - /path/to/file1 - /path/to/file2 4. location: None inputs: - @/path/to/file1 - @/path/to/file2 Among the above 4 cases, only the 1, 3, 5 case are valid, for 2,4 the file will not be uploaded """check_mixup_res=self.check_file_mixup(schema_mapping)ifnotcheck_mixup_res.is_ok():returncheck_mixup_resschema_mapping=check_mixup_res.get_value()# now try upload the replace inplaceprint("after check_mixup_res: ")upload_res=self.upload_and_replace_input_inplace(schema_mapping)ifnotupload_res.is_ok():returnupload_resprint("new schema_mapping: ",upload_res.get_value())returnResult.ok(upload_res.get_value())
[docs]defensure_param_str(self,param_name:str,param):""" Ensure the param is a string, otherwise raise an exception """ifnotisinstance(param,str):# User may input the graph_id as int, convert it to stringifisinstance(param,int):returnstr(param)raiseException("param should be a string, param_name: "+param_name+", param: "+str(param))returnparam