Skip to content

Commit

Permalink
Add kmeans clustering function
Browse files Browse the repository at this point in the history
  • Loading branch information
oyvindeide committed Feb 8, 2021
1 parent 396c4c7 commit 68ea50b
Show file tree
Hide file tree
Showing 11 changed files with 255 additions and 42 deletions.
8 changes: 8 additions & 0 deletions semeio/workflows/misfit_preprocessor/hierarchical_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,15 @@
Extra,
StrictFloat,
StrictInt,
PyObject,
PrivateAttr,
)
import collections

from semeio.workflows.spearman_correlation_job.cluster_analysis import (
fcluster_analysis,
)

# pylint: disable=too-few-public-methods,no-self-argument


Expand Down Expand Up @@ -136,9 +142,11 @@ class HierarchicalConfig(AbstractClusteringConfig):
type: Literal["hierarchical"] = "hierarchical"
linkage: LinkageConfig = LinkageConfig()
fcluster: FclusterConfig = FclusterConfig()
_cluster_function: PyObject = PrivateAttr(fcluster_analysis)


class LimitedHierarchicalConfig(AbstractClusteringConfig):
type: Literal["limited_hierarchical"] = "limited_hierarchical"
linkage: LinkageConfig = LinkageConfig()
fcluster: BaseFclusterConfig = BaseFclusterConfig()
_cluster_function: PyObject = PrivateAttr(fcluster_analysis)
37 changes: 15 additions & 22 deletions semeio/workflows/misfit_preprocessor/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,23 @@

def run(config, measured_data, reporter):
workflow = config.workflow
if workflow.type == "custom_scale":
sconfig = workflow.clustering.cluster_args()
scaling_configs = spearman_job(
measured_data,
reporter,
**sconfig,
)
pca_threshold = config.workflow.pca.threshold
elif workflow.type == "auto_scale":
sconfig = workflow.clustering.cluster_args()
if workflow.type == "auto_scale":
job = ObservationScaleFactor(reporter, measured_data)
nr_components, _ = job.perform_pca(workflow.pca.threshold)
sconfig = workflow.clustering.cluster_args()
scaling_configs = spearman_job(
measured_data,
reporter,
criterion="maxclust",
threshold=nr_components,
**sconfig,
)
pca_threshold = workflow.pca.threshold
else:
raise AssertionError(
"Unknown clustering method: {}".format(config.workflow.method)
)
if workflow.clustering.type == "limited_hierarchical":
sconfig["criterion"] = "maxclust"
sconfig["threshold"] = nr_components
elif workflow.clustering.type == "limited_kmeans":
sconfig["n_clusters"] = nr_components

scaling_configs = spearman_job(
measured_data,
reporter,
cluster_function=workflow.clustering._cluster_function,
**sconfig,
)
pca_threshold = workflow.pca.threshold

scaling_params = {"threshold": pca_threshold}
for scaling_config in scaling_configs:
Expand Down
62 changes: 62 additions & 0 deletions semeio/workflows/misfit_preprocessor/kmeans_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
from pydantic import conint, Field, PyObject, PrivateAttr

from semeio.workflows.misfit_preprocessor.hierarchical_config import (
AbstractClusteringConfig,
)
from semeio.workflows.spearman_correlation_job.cluster_analysis import (
kmeans_analysis,
)


class LimitedKmeansClustering(AbstractClusteringConfig):
"""
The kmeans implementation is backed by sklearn and for a
more detailed description we refer the reader to the
documentation of sklearn.cluster.KMeans
(https://scikit-learn.org/stable/modules/generated/sklearn.cluster.KMeans.html).
"""

type: Literal["limited_kmeans"] = "limited_kmeans"
init: Literal["k-means++", "random"] = Field(
"k-means++",
description=("The criterion to use in forming flat clusters. "),
)
n_init: conint(gt=0, strict=True) = Field(
10,
description=(
"Number of time the k-means algorithm will be run with "
"different centroid seeds. The final results will be the "
"best output of n_init consecutive runs in terms of inertia."
),
)
max_iter: conint(gt=0, strict=True) = Field(
300,
description=(
"Maximum number of iterations of the k-means algorithm for a single run."
),
)
random_state: conint(gt=0, strict=True) = Field(
None,
description=(
"Determines random number generation for centroid initialization. "
"Use an int to make the randomness deterministic"
),
)
_cluster_function: PyObject = PrivateAttr(kmeans_analysis)


class KmeansClustering(LimitedKmeansClustering):
__doc__ = LimitedKmeansClustering.__doc__
type: Literal["kmeans"] = "kmeans"
n_clusters: conint(gt=0, strict=True) = Field(
8,
description=(
"The scalar gives the maximum number of clusters to be formed."
"This has a defaulted value for auto_scale "
"and is not configurable for this workflow. "
),
)
11 changes: 9 additions & 2 deletions semeio/workflows/misfit_preprocessor/workflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
LimitedHierarchicalConfig,
BaseMisfitPreprocessorConfig,
)
from semeio.workflows.misfit_preprocessor.kmeans_config import (
KmeansClustering,
LimitedKmeansClustering,
)


# pylint: disable=too-few-public-methods,no-self-argument

Expand All @@ -21,13 +26,15 @@ class PCAConfig(BaseMisfitPreprocessorConfig):

class AutoScaleConfig(BaseMisfitPreprocessorConfig):
type: Literal["auto_scale"] = "auto_scale"
clustering: LimitedHierarchicalConfig = LimitedHierarchicalConfig()
clustering: Union[
LimitedHierarchicalConfig, LimitedKmeansClustering
] = LimitedHierarchicalConfig()
pca: PCAConfig = PCAConfig()


class CustomScaleConfig(BaseMisfitPreprocessorConfig):
type: Literal["custom_scale"] = "custom_scale"
clustering: HierarchicalConfig = HierarchicalConfig()
clustering: Union[HierarchicalConfig, KmeansClustering] = HierarchicalConfig()
pca: PCAConfig = PCAConfig()


Expand Down
32 changes: 32 additions & 0 deletions semeio/workflows/spearman_correlation_job/cluster_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from scipy.cluster.hierarchy import fcluster, linkage
from sklearn.cluster import KMeans


def fcluster_analysis(
correlation_matrix,
threshold=1.0,
criterion="inconsistent",
depth=2,
method="single",
metric="euclidean",
):
a = linkage(correlation_matrix, method, metric)
return fcluster(a, threshold, criterion=criterion, depth=depth)


def kmeans_analysis(
correlation_matrix,
n_clusters=8,
init="k-means++",
n_init=10,
max_iter=300,
random_state=None,
):
kmeans = KMeans(
init=init,
n_clusters=n_clusters,
n_init=n_init,
max_iter=max_iter,
random_state=random_state,
).fit(correlation_matrix)
return kmeans.labels_ + 1 # Scikit clusters are 0-indexed while scipy is 1-indexed
20 changes: 6 additions & 14 deletions semeio/workflows/spearman_correlation_job/job.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
# -*- coding: utf-8 -*-
import itertools
import logging
from scipy.cluster.hierarchy import fcluster, linkage

from semeio.workflows.spearman_correlation_job.cluster_analysis import (
fcluster_analysis,
)


def spearman_job(
measured_data,
reporter,
cluster_function=fcluster_analysis,
**cluster_args,
):
"""
Expand All @@ -21,7 +25,7 @@ def spearman_job(
correlation_matrix = _calculate_correlation_matrix(simulated_data)
reporter.publish_csv("correlation_matrix", correlation_matrix)

clusters = _cluster_analysis(correlation_matrix, **cluster_args)
clusters = cluster_function(correlation_matrix, **cluster_args)

columns = correlation_matrix.columns

Expand Down Expand Up @@ -91,15 +95,3 @@ def _calculate_correlation_matrix(data):
# of pandas (https://github.com/pandas-dev/pandas/pull/28151), for now this is
# equivalent:
return data.rank().corr(method="pearson")


def _cluster_analysis(
correlation_matrix,
threshold=1.15,
criterion="inconsistent",
depth=2,
method="single",
metric="euclidean",
):
a = linkage(correlation_matrix, method, metric)
return fcluster(a, threshold, criterion=criterion, depth=depth)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"numpy",
"pandas>0.24",
"pydantic",
"sklearn",
"scipy",
"xlrd<2",
"stea",
Expand Down
4 changes: 3 additions & 1 deletion tests/jobs/misfit_preprocessor/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,14 @@ def test_misfit_preprocessor_invalid_config(test_data_root):
expected_err_msg = (
"Invalid configuration of misfit preprocessor\n"
" - extra fields not permitted (workflow.clustering.threshold)\n"
# There are two clustering functions, and this is invalid in both
" - extra fields not permitted (workflow.clustering.threshold)\n"
" - extra fields not permitted (unknown_key)\n"
)
job = misfit_preprocessor.MisfitPreprocessorJob(ert)
with pytest.raises(semeio.workflows.misfit_preprocessor.ValidationError) as err:
job.run(config_file)
assert expected_err_msg == str(err.value)
assert str(err.value) == expected_err_msg


@pytest.mark.usefixtures("setup_tmpdir")
Expand Down
33 changes: 30 additions & 3 deletions tests/jobs/misfit_preprocessor/test_misfit_preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,33 @@ def parameter_distribution():
return observations, simulated


@pytest.mark.parametrize("clustering_function", ["hierarchical", "kmeans"])
@pytest.mark.parametrize("method", ["custom_scale", "auto_scale"])
@pytest.mark.parametrize(
"num_polynomials",
tuple(range(1, 5)) + (20, 100),
)
def test_misfit_preprocessor_n_polynomials(num_polynomials, method):
def test_misfit_preprocessor_n_polynomials(
num_polynomials, method, clustering_function
):
"""
The goal of this test is to create a data set of uncorrelated polynomials,
meaning that there should be as many clusters as there are input polynomials.
"""

# If we are using kmeans where we have to specify the number of clusters,
# the default set up is not that viable, so we give the number
# of clusters.
if clustering_function == "kmeans" and method == "custom_scale":
sconfig = {"n_clusters": num_polynomials}
else:
sconfig = None

# The clustering functions for auto_scale are limited compared
# with custom_scale
if method == "auto_scale":
clustering_function = "limited_" + clustering_function

state_size = 3
poly_states = [range(1, state_size + 1) for _ in range(num_polynomials)]

Expand All @@ -169,14 +186,23 @@ def test_misfit_preprocessor_n_polynomials(num_polynomials, method):
# We set the PCA threshold to 0.99 so a high degree of correlation is required
# to have an impact. Setting it this way only has an impact for "auto_scale"
obs_keys = measured_data.data.columns.get_level_values(0)
job_config = {
"workflow": {
"type": method,
"pca": {"threshold": 0.99},
"clustering": {"type": clustering_function},
}
}
if sconfig:
job_config["workflow"]["clustering"].update(sconfig)
config = assemble_config(
{"workflow": {"type": method, "pca": {"threshold": 0.99}}},
job_config,
list(obs_keys),
)
reporter_mock = Mock()
configs = misfit_preprocessor.run(config, measured_data, reporter_mock)
assert_homogen_clusters(configs)
assert num_polynomials == len(configs), configs
assert len(configs) == num_polynomials, configs


@pytest.mark.parametrize("linkage", ["average", "single"])
Expand Down Expand Up @@ -269,6 +295,7 @@ def test_misfit_preprocessor_configuration_errors():
expected_err_msg = (
"Invalid configuration of misfit preprocessor\n"
" - extra fields not permitted (workflow.clustering.threshold)\n"
" - extra fields not permitted (workflow.clustering.threshold)\n"
" - extra fields not permitted (unknown_key)\n"
)
assert str(ve.value) == expected_err_msg
Expand Down
25 changes: 25 additions & 0 deletions tests/jobs/misfit_preprocessor/unit/test_cluster_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# pylint: disable=unbalanced-tuple-unpacking
from semeio.workflows.spearman_correlation_job.cluster_analysis import (
kmeans_analysis,
fcluster_analysis,
)
from sklearn.datasets import make_blobs
import pytest


@pytest.mark.parametrize(
"func, kwargs",
((fcluster_analysis, {"criterion": "maxclust"}), (kmeans_analysis, {})),
)
def test_same_format(func, kwargs):
# The goal of this test is not to test the actual clustering functions,
# but rather their format. Scipy clusters (labels) are 1-indexed while
# sklearn are 0-indexed. We therefore set up a very simple dataset with
# clearly defined clusters so the result will be the same for all functions.
features, true_labels = make_blobs(
n_samples=200, centers=3, cluster_std=0.1, random_state=42
)
cluster_result = func(features, 3, **kwargs)
# The clusters are typically the same, but the labels vary so we perform the
# simplest test, just checking that the desired labels are present.
assert set(cluster_result) == {1, 2, 3}
Loading

0 comments on commit 68ea50b

Please sign in to comment.