Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kmeans clustering function #286

Merged
merged 1 commit into from
Feb 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions semeio/workflows/misfit_preprocessor/hierarchical_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,15 @@
Extra,
StrictFloat,
StrictInt,
PyObject,
PrivateAttr,
)
import collections

from semeio.workflows.spearman_correlation_job.cluster_analysis import (
fcluster_analysis,
)

# pylint: disable=too-few-public-methods,no-self-argument


Expand Down Expand Up @@ -136,9 +142,11 @@ class HierarchicalConfig(AbstractClusteringConfig):
type: Literal["hierarchical"] = "hierarchical"
linkage: LinkageConfig = LinkageConfig()
fcluster: FclusterConfig = FclusterConfig()
_cluster_function: PyObject = PrivateAttr(fcluster_analysis)


class LimitedHierarchicalConfig(AbstractClusteringConfig):
type: Literal["limited_hierarchical"] = "limited_hierarchical"
linkage: LinkageConfig = LinkageConfig()
fcluster: BaseFclusterConfig = BaseFclusterConfig()
_cluster_function: PyObject = PrivateAttr(fcluster_analysis)
37 changes: 15 additions & 22 deletions semeio/workflows/misfit_preprocessor/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,23 @@

def run(config, measured_data, reporter):
workflow = config.workflow
if workflow.type == "custom_scale":
sconfig = workflow.clustering.cluster_args()
scaling_configs = spearman_job(
measured_data,
reporter,
**sconfig,
)
pca_threshold = config.workflow.pca.threshold
elif workflow.type == "auto_scale":
sconfig = workflow.clustering.cluster_args()
if workflow.type == "auto_scale":
job = ObservationScaleFactor(reporter, measured_data)
nr_components, _ = job.perform_pca(workflow.pca.threshold)
sconfig = workflow.clustering.cluster_args()
scaling_configs = spearman_job(
measured_data,
reporter,
criterion="maxclust",
threshold=nr_components,
**sconfig,
)
pca_threshold = workflow.pca.threshold
else:
raise AssertionError(
"Unknown clustering method: {}".format(config.workflow.method)
)
if workflow.clustering.type == "limited_hierarchical":
sconfig["criterion"] = "maxclust"
lars-petter-hauge marked this conversation as resolved.
Show resolved Hide resolved
sconfig["threshold"] = nr_components
elif workflow.clustering.type == "limited_kmeans":
sconfig["n_clusters"] = nr_components

scaling_configs = spearman_job(
measured_data,
reporter,
cluster_function=workflow.clustering._cluster_function,
oyvindeide marked this conversation as resolved.
Show resolved Hide resolved
**sconfig,
)
pca_threshold = workflow.pca.threshold

scaling_params = {"threshold": pca_threshold}
for scaling_config in scaling_configs:
Expand Down
62 changes: 62 additions & 0 deletions semeio/workflows/misfit_preprocessor/kmeans_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
from pydantic import conint, Field, PyObject, PrivateAttr

from semeio.workflows.misfit_preprocessor.hierarchical_config import (
AbstractClusteringConfig,
)
from semeio.workflows.spearman_correlation_job.cluster_analysis import (
kmeans_analysis,
)


class LimitedKmeansClustering(AbstractClusteringConfig):
"""
The kmeans implementation is backed by sklearn and for a
more detailed description we refer the reader to the
documentation of sklearn.cluster.KMeans
(https://scikit-learn.org/stable/modules/generated/sklearn.cluster.KMeans.html).
"""

type: Literal["limited_kmeans"] = "limited_kmeans"
init: Literal["k-means++", "random"] = Field(
"k-means++",
description=("The criterion to use in forming flat clusters. "),
)
n_init: conint(gt=0, strict=True) = Field(
10,
description=(
"Number of time the k-means algorithm will be run with "
"different centroid seeds. The final results will be the "
"best output of n_init consecutive runs in terms of inertia."
),
)
max_iter: conint(gt=0, strict=True) = Field(
300,
description=(
"Maximum number of iterations of the k-means algorithm for a single run."
),
)
random_state: conint(gt=0, strict=True) = Field(
None,
description=(
"Determines random number generation for centroid initialization. "
"Use an int to make the randomness deterministic"
),
)
_cluster_function: PyObject = PrivateAttr(kmeans_analysis)
oyvindeide marked this conversation as resolved.
Show resolved Hide resolved


class KmeansClustering(LimitedKmeansClustering):
__doc__ = LimitedKmeansClustering.__doc__
type: Literal["kmeans"] = "kmeans"
n_clusters: conint(gt=0, strict=True) = Field(
8,
description=(
"The scalar gives the maximum number of clusters to be formed."
"This has a defaulted value for auto_scale "
"and is not configurable for this workflow. "
),
)
11 changes: 9 additions & 2 deletions semeio/workflows/misfit_preprocessor/workflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
LimitedHierarchicalConfig,
BaseMisfitPreprocessorConfig,
)
from semeio.workflows.misfit_preprocessor.kmeans_config import (
KmeansClustering,
LimitedKmeansClustering,
)


# pylint: disable=too-few-public-methods,no-self-argument

Expand All @@ -21,13 +26,15 @@ class PCAConfig(BaseMisfitPreprocessorConfig):

class AutoScaleConfig(BaseMisfitPreprocessorConfig):
type: Literal["auto_scale"] = "auto_scale"
clustering: LimitedHierarchicalConfig = LimitedHierarchicalConfig()
clustering: Union[
LimitedHierarchicalConfig, LimitedKmeansClustering
] = LimitedHierarchicalConfig()
pca: PCAConfig = PCAConfig()


class CustomScaleConfig(BaseMisfitPreprocessorConfig):
type: Literal["custom_scale"] = "custom_scale"
clustering: HierarchicalConfig = HierarchicalConfig()
clustering: Union[HierarchicalConfig, KmeansClustering] = HierarchicalConfig()
pca: PCAConfig = PCAConfig()


Expand Down
32 changes: 32 additions & 0 deletions semeio/workflows/spearman_correlation_job/cluster_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from scipy.cluster.hierarchy import fcluster, linkage
from sklearn.cluster import KMeans


def fcluster_analysis(
correlation_matrix,
threshold=1.0,
criterion="inconsistent",
depth=2,
method="single",
metric="euclidean",
):
a = linkage(correlation_matrix, method, metric)
return fcluster(a, threshold, criterion=criterion, depth=depth)


def kmeans_analysis(
correlation_matrix,
n_clusters=8,
init="k-means++",
n_init=10,
max_iter=300,
random_state=None,
):
kmeans = KMeans(
init=init,
n_clusters=n_clusters,
n_init=n_init,
max_iter=max_iter,
random_state=random_state,
).fit(correlation_matrix)
return kmeans.labels_ + 1 # Scikit clusters are 0-indexed while scipy is 1-indexed
20 changes: 6 additions & 14 deletions semeio/workflows/spearman_correlation_job/job.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
# -*- coding: utf-8 -*-
import itertools
import logging
from scipy.cluster.hierarchy import fcluster, linkage

from semeio.workflows.spearman_correlation_job.cluster_analysis import (
fcluster_analysis,
)


def spearman_job(
measured_data,
reporter,
cluster_function=fcluster_analysis,
**cluster_args,
):
"""
Expand All @@ -21,7 +25,7 @@ def spearman_job(
correlation_matrix = _calculate_correlation_matrix(simulated_data)
reporter.publish_csv("correlation_matrix", correlation_matrix)

clusters = _cluster_analysis(correlation_matrix, **cluster_args)
clusters = cluster_function(correlation_matrix, **cluster_args)

columns = correlation_matrix.columns

Expand Down Expand Up @@ -91,15 +95,3 @@ def _calculate_correlation_matrix(data):
# of pandas (https://github.com/pandas-dev/pandas/pull/28151), for now this is
# equivalent:
return data.rank().corr(method="pearson")


def _cluster_analysis(
correlation_matrix,
threshold=1.15,
criterion="inconsistent",
depth=2,
method="single",
metric="euclidean",
):
a = linkage(correlation_matrix, method, metric)
return fcluster(a, threshold, criterion=criterion, depth=depth)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"numpy",
"pandas>0.24",
"pydantic",
"sklearn",
"scipy",
"xlrd<2",
"stea",
Expand Down
4 changes: 3 additions & 1 deletion tests/jobs/misfit_preprocessor/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,14 @@ def test_misfit_preprocessor_invalid_config(test_data_root):
expected_err_msg = (
"Invalid configuration of misfit preprocessor\n"
" - extra fields not permitted (workflow.clustering.threshold)\n"
# There are two clustering functions, and this is invalid in both
" - extra fields not permitted (workflow.clustering.threshold)\n"
" - extra fields not permitted (unknown_key)\n"
)
job = misfit_preprocessor.MisfitPreprocessorJob(ert)
with pytest.raises(semeio.workflows.misfit_preprocessor.ValidationError) as err:
job.run(config_file)
assert expected_err_msg == str(err.value)
assert str(err.value) == expected_err_msg


@pytest.mark.usefixtures("setup_tmpdir")
Expand Down
33 changes: 30 additions & 3 deletions tests/jobs/misfit_preprocessor/test_misfit_preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,33 @@ def parameter_distribution():
return observations, simulated


@pytest.mark.parametrize("clustering_function", ["hierarchical", "kmeans"])
@pytest.mark.parametrize("method", ["custom_scale", "auto_scale"])
@pytest.mark.parametrize(
"num_polynomials",
tuple(range(1, 5)) + (20, 100),
)
def test_misfit_preprocessor_n_polynomials(num_polynomials, method):
def test_misfit_preprocessor_n_polynomials(
Copy link
Contributor

@eivindjahren eivindjahren Jan 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in this case, you get a cleaner implementation with hypothesis than with pytest parametrization.
Just a suggestion:

from hypothesis import given, assume
import hypothesis.strategies as st

clustering_functions = st.sampled_from(["hierarchical", "kmeans"])
methods = st.sampled_from(["spearman_correlation", "auto_scale"])

@given(st.integers(min_value=1, max_value=100), methods, clustering_functions)
def test_misfit_preprocessor_n_polynomials(
   num_polynomials, method, clustering_function
):
    if (
        clustering_function == "kmeans"
        and method == "spearman_correlation"
     ):
       assume(num_polynomials in [4,5]) # or more

num_polynomials, method, clustering_function
):
"""
The goal of this test is to create a data set of uncorrelated polynomials,
meaning that there should be as many clusters as there are input polynomials.
"""

# If we are using kmeans where we have to specify the number of clusters,
# the default set up is not that viable, so we give the number
# of clusters.
if clustering_function == "kmeans" and method == "custom_scale":
sconfig = {"n_clusters": num_polynomials}
else:
sconfig = None

# The clustering functions for auto_scale are limited compared
# with custom_scale
if method == "auto_scale":
clustering_function = "limited_" + clustering_function

state_size = 3
poly_states = [range(1, state_size + 1) for _ in range(num_polynomials)]

Expand All @@ -169,14 +186,23 @@ def test_misfit_preprocessor_n_polynomials(num_polynomials, method):
# We set the PCA threshold to 0.99 so a high degree of correlation is required
# to have an impact. Setting it this way only has an impact for "auto_scale"
obs_keys = measured_data.data.columns.get_level_values(0)
job_config = {
"workflow": {
"type": method,
"pca": {"threshold": 0.99},
"clustering": {"type": clustering_function},
}
}
if sconfig:
job_config["workflow"]["clustering"].update(sconfig)
config = assemble_config(
{"workflow": {"type": method, "pca": {"threshold": 0.99}}},
job_config,
list(obs_keys),
)
reporter_mock = Mock()
configs = misfit_preprocessor.run(config, measured_data, reporter_mock)
assert_homogen_clusters(configs)
assert num_polynomials == len(configs), configs
assert len(configs) == num_polynomials, configs


@pytest.mark.parametrize("linkage", ["average", "single"])
Expand Down Expand Up @@ -269,6 +295,7 @@ def test_misfit_preprocessor_configuration_errors():
expected_err_msg = (
"Invalid configuration of misfit preprocessor\n"
" - extra fields not permitted (workflow.clustering.threshold)\n"
" - extra fields not permitted (workflow.clustering.threshold)\n"
" - extra fields not permitted (unknown_key)\n"
)
assert str(ve.value) == expected_err_msg
Expand Down
25 changes: 25 additions & 0 deletions tests/jobs/misfit_preprocessor/unit/test_cluster_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# pylint: disable=unbalanced-tuple-unpacking
from semeio.workflows.spearman_correlation_job.cluster_analysis import (
kmeans_analysis,
fcluster_analysis,
)
from sklearn.datasets import make_blobs
import pytest


@pytest.mark.parametrize(
"func, kwargs",
((fcluster_analysis, {"criterion": "maxclust"}), (kmeans_analysis, {})),
)
def test_same_format(func, kwargs):
# The goal of this test is not to test the actual clustering functions,
# but rather their format. Scipy clusters (labels) are 1-indexed while
# sklearn are 0-indexed. We therefore set up a very simple dataset with
# clearly defined clusters so the result will be the same for all functions.
features, true_labels = make_blobs(
n_samples=200, centers=3, cluster_std=0.1, random_state=42
)
cluster_result = func(features, 3, **kwargs)
# The clusters are typically the same, but the labels vary so we perform the
# simplest test, just checking that the desired labels are present.
assert set(cluster_result) == {1, 2, 3}
Loading