## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#frommultiprocessing.poolimportThreadPoolfromtypingimport(Any,Callable,Dict,List,Optional,Sequence,Tuple,Union,cast,TYPE_CHECKING,)importnumpyasnpimportpandasaspdfrompysparkimportkeyword_only,since,inheritable_thread_targetfrompyspark.ml.connectimportEstimator,Modelfrompyspark.ml.connect.baseimportEvaluatorfrompyspark.ml.connect.io_utilsimport(MetaAlgorithmReadWrite,ParamsReadWrite,)frompyspark.ml.paramimportParams,Param,TypeConvertersfrompyspark.ml.param.sharedimportHasParallelism,HasSeedfrompyspark.sql.functionsimportcol,lit,randfrompyspark.sql.dataframeimportDataFramefrompyspark.sqlimportSparkSessionfrompyspark.sql.utilsimportis_remoteifTYPE_CHECKING:frompyspark.ml._typingimportParamMapclass_ValidatorParams(HasSeed):""" Common params for TrainValidationSplit and CrossValidator. """estimator:Param[Estimator]=Param(Params._dummy(),"estimator","estimator to be cross-validated")estimatorParamMaps:Param[List["ParamMap"]]=Param(Params._dummy(),"estimatorParamMaps","estimator param maps")evaluator:Param[Evaluator]=Param(Params._dummy(),"evaluator","evaluator used to select hyper-parameters that maximize the validator metric",)@since("2.0.0")defgetEstimator(self)->Estimator:""" Gets the value of estimator or its default value. """returnself.getOrDefault(self.estimator)@since("2.0.0")defgetEstimatorParamMaps(self)->List["ParamMap"]:""" Gets the value of estimatorParamMaps or its default value. """returnself.getOrDefault(self.estimatorParamMaps)@since("2.0.0")defgetEvaluator(self)->Evaluator:""" Gets the value of evaluator or its default value. """returnself.getOrDefault(self.evaluator)class_CrossValidatorParams(_ValidatorParams):""" Params for :py:class:`CrossValidator` and :py:class:`CrossValidatorModel`. .. versionadded:: 3.5.0 """numFolds:Param[int]=Param(Params._dummy(),"numFolds","number of folds for cross validation",typeConverter=TypeConverters.toInt,)foldCol:Param[str]=Param(Params._dummy(),"foldCol","Param for the column name of user "+"specified fold number. Once this is specified, :py:class:`CrossValidator` "+"won't do random k-fold split. Note that this column should be integer type "+"with range [0, numFolds) and Spark will throw exception on out-of-range "+"fold numbers.",typeConverter=TypeConverters.toString,)def__init__(self,*args:Any):super(_CrossValidatorParams,self).__init__(*args)self._setDefault(numFolds=3,foldCol="")@since("1.4.0")defgetNumFolds(self)->int:""" Gets the value of numFolds or its default value. """returnself.getOrDefault(self.numFolds)@since("3.1.0")defgetFoldCol(self)->str:""" Gets the value of foldCol or its default value. """returnself.getOrDefault(self.foldCol)def_parallelFitTasks(estimator:Estimator,train:DataFrame,evaluator:Evaluator,validation:DataFrame,epm:Sequence["ParamMap"],)->List[Callable[[],Tuple[int,float]]]:""" Creates a list of callables which can be called from different threads to fit and evaluate an estimator in parallel. Each callable returns an `(index, metric)` pair. Parameters ---------- est : :py:class:`pyspark.ml.baseEstimator` he estimator to be fit. train : :py:class:`pyspark.sql.DataFrame` DataFrame, training data set, used for fitting. eva : :py:class:`pyspark.ml.evaluation.Evaluator` used to compute `metric` validation : :py:class:`pyspark.sql.DataFrame` DataFrame, validation data set, used for evaluation. epm : :py:class:`collections.abc.Sequence` Sequence of ParamMap, params maps to be used during fitting & evaluation. collectSubModel : bool Whether to collect sub model. Returns ------- tuple (int, float), an index into `epm` and the associated metric value. """active_session=SparkSession.getActiveSession()ifactive_sessionisNone:raiseRuntimeError("An active SparkSession is required for running cross valiator fit tasks.")defget_single_task(index:int,param_map:Any)->Callable[[],Tuple[int,float]]:defsingle_task()->Tuple[int,float]:ifnotis_remote():# Active session is thread-local variable, in background thread the active session# is not set, the following line sets it as the main thread active session.active_session._jvm.SparkSession.setActiveSession(# type: ignore[union-attr]active_session._jsparkSession)model=estimator.fit(train,param_map)metric=evaluator.evaluate(model.transform(validation,param_map)# type: ignore[union-attr])returnindex,metricreturnsingle_taskreturn[get_single_task(index,param_map)forindex,param_mapinenumerate(epm)]class_CrossValidatorReadWrite(MetaAlgorithmReadWrite):def_get_skip_saving_params(self)->List[str]:""" Returns params to be skipped when saving metadata. """return["estimator","estimatorParamMaps","evaluator"]def_save_meta_algorithm(self,root_path:str,node_path:List[str])->Dict[str,Any]:metadata=self._get_metadata_to_save()metadata["estimator"]=self.getEstimator()._save_to_node_path(# type: ignore[attr-defined]root_path,node_path+["crossvalidator_estimator"])metadata["evaluator"]=self.getEvaluator()._save_to_node_path(# type: ignore[attr-defined]root_path,node_path+["crossvalidator_evaluator"])metadata["estimator_param_maps"]=[[{"parent":param.parent,"name":param.name,"value":value}forparam,valueinparam_map.items()]forparam_mapinself.getEstimatorParamMaps()# type: ignore[attr-defined]]ifisinstance(self,CrossValidatorModel):metadata["avg_metrics"]=self.avgMetricsmetadata["std_metrics"]=self.stdMetricsmetadata["best_model"]=self.bestModel._save_to_node_path(root_path,node_path+["crossvalidator_best_model"])returnmetadatadef_load_meta_algorithm(self,root_path:str,node_metadata:Dict[str,Any])->None:estimator=ParamsReadWrite._load_instance_from_metadata(node_metadata["estimator"],root_path)self.set(self.estimator,estimator)# type: ignore[attr-defined]evaluator=ParamsReadWrite._load_instance_from_metadata(node_metadata["evaluator"],root_path)self.set(self.evaluator,evaluator)# type: ignore[attr-defined]json_epm=node_metadata["estimator_param_maps"]uid_to_instances=MetaAlgorithmReadWrite.get_uid_map(estimator)epm=[]forjson_param_mapinjson_epm:param_map={}forjson_paraminjson_param_map:est=uid_to_instances[json_param["parent"]]param=getattr(est,json_param["name"])value=json_param["value"]param_map[param]=valueepm.append(param_map)self.set(self.estimatorParamMaps,epm)# type: ignore[attr-defined]ifisinstance(self,CrossValidatorModel):self.avgMetrics=node_metadata["avg_metrics"]self.stdMetrics=node_metadata["std_metrics"]self.bestModel=ParamsReadWrite._load_instance_from_metadata(node_metadata["best_model"],root_path)
[docs]classCrossValidator(Estimator["CrossValidatorModel"],_CrossValidatorParams,HasParallelism,_CrossValidatorReadWrite,):""" K-fold cross validation performs model selection by splitting the dataset into a set of non-overlapping randomly partitioned folds which are used as separate training and test datasets e.g., with k=3 folds, K-fold cross validation will generate 3 (training, test) dataset pairs, each of which uses 2/3 of the data for training and 1/3 for testing. Each fold is used as the test set exactly once. .. versionadded:: 3.5.0 Examples -------- >>> from pyspark.ml.connect.tuning import CrossValidator >>> from pyspark.ml.connect.classification import LogisticRegression >>> from pyspark.ml.connect.evaluation import BinaryClassificationEvaluator >>> from pyspark.ml.tuning import ParamGridBuilder >>> from sklearn.datasets import load_breast_cancer >>> lor = LogisticRegression(maxIter=20, learningRate=0.01) >>> ev = BinaryClassificationEvaluator() >>> grid = ParamGridBuilder().addGrid(lor.maxIter, [2, 20]).build() >>> cv = CrossValidator(estimator=lor, evaluator=ev, estimatorParamMaps=grid) >>> sk_dataset = load_breast_cancer() >>> train_dataset = spark.createDataFrame( ... zip(sk_dataset.data.tolist(), [int(t) for t in sk_dataset.target]), ... schema="features: array<double>, label: long", ... ) >>> cv_model = cv.fit(train_dataset) >>> transformed_dataset = cv_model.transform(train_dataset.limit(10)) >>> cv_model.avgMetrics [0.5527792527167658, 0.8348714668615984] >>> cv_model.stdMetrics [0.04902833489813031, 0.05247132866444953] """_input_kwargs:Dict[str,Any]@keyword_onlydef__init__(self,*,estimator:Optional[Estimator]=None,estimatorParamMaps:Optional[List["ParamMap"]]=None,evaluator:Optional[Evaluator]=None,numFolds:int=3,seed:Optional[int]=None,parallelism:int=1,foldCol:str="",)->None:""" __init__(self, \\*, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\ seed=None, parallelism=1, foldCol="") """super(CrossValidator,self).__init__()self._setDefault(parallelism=1)kwargs=self._input_kwargsself._set(**kwargs)
[docs]@since("3.5.0")defsetEstimator(self,value:Estimator)->"CrossValidator":""" Sets the value of :py:attr:`estimator`. """returnself._set(estimator=value)
[docs]@since("3.5.0")defsetEstimatorParamMaps(self,value:List["ParamMap"])->"CrossValidator":""" Sets the value of :py:attr:`estimatorParamMaps`. """returnself._set(estimatorParamMaps=value)
[docs]@since("3.5.0")defsetEvaluator(self,value:Evaluator)->"CrossValidator":""" Sets the value of :py:attr:`evaluator`. """returnself._set(evaluator=value)
[docs]@since("3.5.0")defsetNumFolds(self,value:int)->"CrossValidator":""" Sets the value of :py:attr:`numFolds`. """returnself._set(numFolds=value)
[docs]@since("3.5.0")defsetFoldCol(self,value:str)->"CrossValidator":""" Sets the value of :py:attr:`foldCol`. """returnself._set(foldCol=value)
[docs]defsetSeed(self,value:int)->"CrossValidator":""" Sets the value of :py:attr:`seed`. """returnself._set(seed=value)
[docs]defsetParallelism(self,value:int)->"CrossValidator":""" Sets the value of :py:attr:`parallelism`. """returnself._set(parallelism=value)
[docs]defsetCollectSubModels(self,value:bool)->"CrossValidator":""" Sets the value of :py:attr:`collectSubModels`. """returnself._set(collectSubModels=value)
@staticmethoddef_gen_avg_and_std_metrics(metrics_all:List[List[float]],)->Tuple[List[float],List[float]]:avg_metrics=np.mean(metrics_all,axis=0)std_metrics=np.std(metrics_all,axis=0)returnlist(avg_metrics),list(std_metrics)def_fit(self,dataset:Union[pd.DataFrame,DataFrame])->"CrossValidatorModel":ifisinstance(dataset,pd.DataFrame):# TODO: support pandas dataframe fittingraiseNotImplementedError("Fitting pandas dataframe is not supported yet.")est=self.getOrDefault(self.estimator)epm=self.getOrDefault(self.estimatorParamMaps)numModels=len(epm)eva=self.getOrDefault(self.evaluator)nFolds=self.getOrDefault(self.numFolds)metrics_all=[[0.0]*numModelsforiinrange(nFolds)]pool=ThreadPool(processes=min(self.getParallelism(),numModels))datasets=self._kFold(dataset)foriinrange(nFolds):validation=datasets[i][1].cache()train=datasets[i][0].cache()tasks=_parallelFitTasks(est,train,eva,validation,epm)ifnotis_remote():tasks=list(map(inheritable_thread_target,tasks))forj,metricinpool.imap_unordered(lambdaf:f(),tasks):metrics_all[i][j]=metricvalidation.unpersist()train.unpersist()metrics,std_metrics=CrossValidator._gen_avg_and_std_metrics(metrics_all)ifeva.isLargerBetter():bestIndex=np.argmax(metrics)else:bestIndex=np.argmin(metrics)bestModel=cast(Model,est.fit(dataset,epm[bestIndex]))cv_model=self._copyValues(CrossValidatorModel(bestModel,avgMetrics=metrics,stdMetrics=std_metrics,))cv_model._resetUid(self.uid)returncv_modeldef_kFold(self,dataset:DataFrame)->List[Tuple[DataFrame,DataFrame]]:nFolds=self.getOrDefault(self.numFolds)foldCol=self.getOrDefault(self.foldCol)datasets=[]ifnotfoldCol:# Do random k-fold split.seed=self.getOrDefault(self.seed)h=1.0/nFoldsrandCol=self.uid+"_rand"df=dataset.select("*",rand(seed).alias(randCol))foriinrange(nFolds):validateLB=i*hvalidateUB=(i+1)*hcondition=(df[randCol]>=validateLB)&(df[randCol]<validateUB)validation=df.filter(condition)train=df.filter(~condition)datasets.append((train,validation))else:# TODO:# Add verification that foldCol column values are in range [0, nFolds)foriinrange(nFolds):training=dataset.filter(col(foldCol)!=lit(i))validation=dataset.filter(col(foldCol)==lit(i))iftraining.isEmpty():raiseValueError("The training data at fold %s is empty."%i)ifvalidation.isEmpty():raiseValueError("The validation data at fold %s is empty."%i)datasets.append((training,validation))returndatasets
[docs]defcopy(self,extra:Optional["ParamMap"]=None)->"CrossValidator":""" Creates a copy of this instance with a randomly generated uid and some extra params. This copies creates a deep copy of the embedded paramMap, and copies the embedded and extra parameters over. .. versionadded:: 3.5.0 Parameters ---------- extra : dict, optional Extra parameters to copy to the new instance Returns ------- :py:class:`CrossValidator` Copy of this instance """ifextraisNone:extra=dict()newCV=Params.copy(self,extra)ifself.isSet(self.estimator):newCV.setEstimator(self.getEstimator().copy(extra))# estimatorParamMaps remain the sameifself.isSet(self.evaluator):newCV.setEvaluator(self.getEvaluator().copy(extra))returnnewCV
[docs]classCrossValidatorModel(Model,_CrossValidatorParams,_CrossValidatorReadWrite):""" CrossValidatorModel contains the model with the highest average cross-validation metric across folds and uses this model to transform input data. CrossValidatorModel also tracks the metrics for each param map evaluated. .. versionadded:: 3.5.0 """def__init__(self,bestModel:Optional[Model]=None,avgMetrics:Optional[List[float]]=None,stdMetrics:Optional[List[float]]=None,)->None:super(CrossValidatorModel,self).__init__()#: best model from cross validationself.bestModel=bestModel#: Average cross-validation metrics for each paramMap in#: CrossValidator.estimatorParamMaps, in the corresponding order.self.avgMetrics=avgMetricsor[]#: standard deviation of metrics for each paramMap in#: CrossValidator.estimatorParamMaps, in the corresponding order.self.stdMetrics=stdMetricsor[]def_transform(self,dataset:Union[DataFrame,pd.DataFrame])->Union[DataFrame,pd.DataFrame]:returnself.bestModel.transform(dataset)
[docs]defcopy(self,extra:Optional["ParamMap"]=None)->"CrossValidatorModel":""" Creates a copy of this instance with a randomly generated uid and some extra params. This copies the underlying bestModel, creates a deep copy of the embedded paramMap, and copies the embedded and extra parameters over. It does not copy the extra Params into the subModels. .. versionadded:: 3.5.0 Parameters ---------- extra : dict, optional Extra parameters to copy to the new instance Returns ------- :py:class:`CrossValidatorModel` Copy of this instance """ifextraisNone:extra=dict()bestModel=self.bestModel.copy(extra)avgMetrics=list(self.avgMetrics)stdMetrics=list(self.stdMetrics)returnself._copyValues(CrossValidatorModel(bestModel,avgMetrics=avgMetrics,stdMetrics=stdMetrics),extra=extra,)