From de9e7ae33d1301751a44454575e69897740c5a57 Mon Sep 17 00:00:00 2001 From: Paul Abumov Date: Thu, 19 Oct 2023 11:18:59 -0400 Subject: [PATCH 1/4] Prevent oversampling of study submissions by any single worker --- .../providers/prolific/prolific_agent.py | 30 ++++++++++++- .../providers/prolific/prolific_provider.py | 11 ++++- .../providers/prolific/prolific_utils.py | 13 +++++- .../providers/prolific/prolific_worker.py | 43 ++++++++++++++++++- mephisto/data_model/agent.py | 12 ++++++ mephisto/data_model/task_run.py | 10 +++++ mephisto/data_model/worker.py | 32 ++++++++++++++ 7 files changed, 145 insertions(+), 6 deletions(-) diff --git a/mephisto/abstractions/providers/prolific/prolific_agent.py b/mephisto/abstractions/providers/prolific/prolific_agent.py index 0c43b7f55..bf5c84c31 100644 --- a/mephisto/abstractions/providers/prolific/prolific_agent.py +++ b/mephisto/abstractions/providers/prolific/prolific_agent.py @@ -92,13 +92,41 @@ def new_from_provider_data( assert isinstance(unit, ProlificUnit), "Can only register Prolific agents to Prolific units" + agent = cls.new(db, worker, unit) + unit.worker_id = worker.db_id + agent._unit = unit + task_run: "TaskRun" = agent.get_task_run() + + # In case provider API wasn't responsive, we ensure this submission + # doesn't exceed per-worker cap for this Task. Othewrwise don't process submission. + if not worker.can_send_more_submissions_for_task(task_run): + logger.info( + f'Submission from worker "{worker.db_id}" is over the Task\'s submission cap.' + ) + try: + worker.exclude_worker_from_task(task_run) + except Exception: + logger.exception( + f"Failed to exclude worker {worker.db_id} in TaskRun {task_run.db_id}." + ) + return agent + prolific_study_id = provider_data["prolific_study_id"] prolific_submission_id = provider_data["assignment_id"] unit.register_from_provider_data(prolific_study_id, prolific_submission_id) logger.debug("Prolific Submission has been registered successfully") - return super().new_from_provider_data(db, worker, unit, provider_data) + # Check whether we need to prevent this worker from future submissions in this Task + if not worker.can_send_more_submissions_for_task(task_run): + try: + worker.exclude_worker_from_task(task_run) + except Exception: + logger.exception( + f"Failed to exclude worker {worker.db_id} in TaskRun {task_run.db_id}." + ) + + return agent def approve_work( self, diff --git a/mephisto/abstractions/providers/prolific/prolific_provider.py b/mephisto/abstractions/providers/prolific/prolific_provider.py index ca0ee7564..5a99dfc86 100644 --- a/mephisto/abstractions/providers/prolific/prolific_provider.py +++ b/mephisto/abstractions/providers/prolific/prolific_provider.py @@ -174,6 +174,7 @@ def _get_qualified_workers( self, qualifications: List[QualificationType], bloked_participant_ids: List[str], + task_run: "TaskRun", ) -> List["Worker"]: qualified_workers = [] workers: List[Worker] = self.db.find_workers(provider_type="prolific") @@ -181,7 +182,9 @@ def _get_qualified_workers( available_workers = [w for w in workers if w.worker_name not in bloked_participant_ids] for worker in available_workers: - if worker_is_qualified(worker, qualifications): + if worker.can_send_more_submissions_for_task(task_run) and worker_is_qualified( + worker, qualifications + ): qualified_workers.append(worker) return qualified_workers @@ -305,7 +308,11 @@ def setup_resources_for_task_run( prolific_specific_qualifications = new_prolific_specific_qualifications if qualifications: - qualified_workers = self._get_qualified_workers(qualifications, blocked_participant_ids) + qualified_workers = self._get_qualified_workers( + qualifications, + blocked_participant_ids, + task_run, + ) if qualified_workers: prolific_workers_ids = [w.worker_name for w in qualified_workers] diff --git a/mephisto/abstractions/providers/prolific/prolific_utils.py b/mephisto/abstractions/providers/prolific/prolific_utils.py index 477d4c7ed..30cd85a8d 100644 --- a/mephisto/abstractions/providers/prolific/prolific_utils.py +++ b/mephisto/abstractions/providers/prolific/prolific_utils.py @@ -578,7 +578,10 @@ def remove_worker_qualification( *args, **kwargs, ) -> None: - """Remove a qualification for the given worker (remove a worker from a Participant Group)""" + """ + Remove a qualification for the given worker (remove a worker from a Participant Group). + NOTE: If a participant is not a member of the group, they will be ignored (from API Docs) + """ try: client.ParticipantGroups.remove_participants_from_group( id=qualification_id, @@ -591,6 +594,14 @@ def remove_worker_qualification( raise +def exclude_worker_from_participant_group( + client: ProlificClient, + worker_id: str, + participant_group_id: str, +): + remove_worker_qualification(client, worker_id, participant_group_id) + + def pay_bonus( client: ProlificClient, task_run_config: "DictConfig", diff --git a/mephisto/abstractions/providers/prolific/prolific_worker.py b/mephisto/abstractions/providers/prolific/prolific_worker.py index 1df4b1ab1..5c178dc78 100644 --- a/mephisto/abstractions/providers/prolific/prolific_worker.py +++ b/mephisto/abstractions/providers/prolific/prolific_worker.py @@ -13,8 +13,6 @@ from typing import Tuple from typing import TYPE_CHECKING -from omegaconf import DictConfig - from mephisto.abstractions.providers.prolific import prolific_utils from mephisto.abstractions.providers.prolific.api.client import ProlificClient from mephisto.abstractions.providers.prolific.provider_type import PROVIDER_TYPE @@ -28,6 +26,7 @@ from mephisto.abstractions.providers.prolific.prolific_requester import ProlificRequester from mephisto.abstractions.providers.prolific.prolific_unit import ProlificUnit from mephisto.data_model.requester import Requester + from mephisto.data_model.task import Task from mephisto.data_model.task_run import TaskRun from mephisto.data_model.unit import Unit @@ -181,6 +180,46 @@ def unblock_worker(self, reason: str, requester: "Requester") -> Tuple[bool, str return True, "" + def exclude_worker_from_task( + self, + task_run: Optional["TaskRun"] = None, + ) -> Tuple[bool, str]: + """Exclude this worker from current Task""" + logger.debug(f"{self.log_prefix}Excluding worker {self.worker_name} from Prolific") + + # 1. Get Client + requester: "ProlificRequester" = task_run.get_requester() + client = self._get_client(requester.requester_name) + + # 2. Find TaskRun IDs that are related to current Task + task: "Task" = task_run.get_task() + all_task_run_ids_for_task: List[str] = [t.db_id for t in task.get_runs()] + + # 3. Select all Participant Group IDs that are related to the Task + datastore_qualifications = self.datastore.find_qualifications_by_ids( + task_run_ids=all_task_run_ids_for_task, + ) + prolific_participant_group_ids = [ + q["prolific_participant_group_id"] for q in datastore_qualifications + ] + + logger.debug( + f"{self.log_prefix}Found {len(prolific_participant_group_ids)} Participant Groups: " + f"{prolific_participant_group_ids}" + ) + + # 4. Exclude the Worker from Prolific Participant Groups + for prolific_participant_group_id in prolific_participant_group_ids: + prolific_utils.exclude_worker_from_participant_group( + client, + self.worker_name, + prolific_participant_group_id, + ) + + logger.debug(f"{self.log_prefix}Worker {self.worker_name} excluded") + + return True, "" + def is_blocked(self, requester: "Requester") -> bool: """Determine if a worker is blocked""" task_run = self._get_first_task_run(requester) diff --git a/mephisto/data_model/agent.py b/mephisto/data_model/agent.py index 70038efb7..43ff863d9 100644 --- a/mephisto/data_model/agent.py +++ b/mephisto/data_model/agent.py @@ -536,6 +536,18 @@ def new_from_provider_data( agent = cls.new(db, worker, unit) unit.worker_id = worker.db_id agent._unit = unit + + # In case provider API wasn't responsive, we ensure this submission + # doesn't exceed per-worker cap for this Task. Othewrwise don't process submission. + task_run: "TaskRun" = agent.get_task_run() + if not worker.can_send_more_submissions_for_task(task_run): + try: + worker.exclude_worker_from_task(task_run) + except Exception: + logger.exception( + f"Failed to exclude worker {worker.db_id} in TaskRun {task_run.db_id}." + ) + return agent def get_status(self) -> str: diff --git a/mephisto/data_model/task_run.py b/mephisto/data_model/task_run.py index 476cc917b..75c761a9c 100644 --- a/mephisto/data_model/task_run.py +++ b/mephisto/data_model/task_run.py @@ -147,6 +147,16 @@ class TaskRunArgs: }, ) + max_submissions_per_worker: Optional[int] = field( + default=None, + metadata={ + "help": ( + "Maximum submissions that a worker can submit on across all " + "tasks that share this task_name. (0 is infinite)" + ) + }, + ) + @classmethod def get_mock_params(cls) -> str: """Returns a param string with default / mock arguments to use for testing""" diff --git a/mephisto/data_model/worker.py b/mephisto/data_model/worker.py index 939019f16..fc19d0744 100644 --- a/mephisto/data_model/worker.py +++ b/mephisto/data_model/worker.py @@ -12,6 +12,8 @@ MephistoDataModelComponentMixin, ) from typing import Any, List, Optional, Mapping, Tuple, Dict, Type, Tuple, TYPE_CHECKING + +from mephisto.data_model.constants.assignment_state import AssignmentState from mephisto.utils.logger_core import get_logger logger = get_logger(name=__name__) @@ -259,6 +261,17 @@ def unblock_worker(self, reason: str, requester: "Requester") -> bool: """unblock a blocked worker for the specified reason""" raise NotImplementedError() + def exclude_worker_from_task( + self, + task_run: Optional["TaskRun"] = None, + ) -> Tuple[bool, str]: + """ + Prevent this worker from further participation in current Task. + (Note that scope of exclusion is only within the current Task, + whereas block lists or altering worker qualifications would affect future Tasks.) + """ + pass + def is_blocked(self, requester: "Requester") -> bool: """Determine if a worker is blocked""" raise NotImplementedError() @@ -267,6 +280,25 @@ def is_eligible(self, task_run: "TaskRun") -> bool: """Determine if this worker is eligible for the given task run""" raise NotImplementedError() + def can_send_more_submissions_for_task(self, task_run: "TaskRun") -> bool: + """Check whether a worker is allowed to send any more submissions within current Task""" + max_submissions_per_worker = task_run.args.max_submissions_per_worker + + # By default, worker can send any amount of submissions + if max_submissions_per_worker is None: + return True + + # Find all completed units byt this worker for current task + task_units = self.db.find_units(task_id=task_run.task_id, worker_id=self.db_id) + completed_task_units = [ + u for u in task_units if u.get_status() in AssignmentState.completed() + ] + + if len(completed_task_units) >= max_submissions_per_worker: + return False + + return True + def register(self, args: Optional[Dict[str, str]] = None) -> None: """Register this worker with the crowdprovider, if necessary""" pass From 31374cae7dafd024c0045b28d8d6ddd7120da3a9 Mon Sep 17 00:00:00 2001 From: Paul Abumov Date: Wed, 25 Oct 2023 20:22:37 -0400 Subject: [PATCH 2/4] Consolidated parameter names for max submissions per worker --- mephisto/data_model/task_run.py | 10 ---------- mephisto/data_model/worker.py | 6 +++--- 2 files changed, 3 insertions(+), 13 deletions(-) diff --git a/mephisto/data_model/task_run.py b/mephisto/data_model/task_run.py index 75c761a9c..476cc917b 100644 --- a/mephisto/data_model/task_run.py +++ b/mephisto/data_model/task_run.py @@ -147,16 +147,6 @@ class TaskRunArgs: }, ) - max_submissions_per_worker: Optional[int] = field( - default=None, - metadata={ - "help": ( - "Maximum submissions that a worker can submit on across all " - "tasks that share this task_name. (0 is infinite)" - ) - }, - ) - @classmethod def get_mock_params(cls) -> str: """Returns a param string with default / mock arguments to use for testing""" diff --git a/mephisto/data_model/worker.py b/mephisto/data_model/worker.py index fc19d0744..6a27a7061 100644 --- a/mephisto/data_model/worker.py +++ b/mephisto/data_model/worker.py @@ -282,10 +282,10 @@ def is_eligible(self, task_run: "TaskRun") -> bool: def can_send_more_submissions_for_task(self, task_run: "TaskRun") -> bool: """Check whether a worker is allowed to send any more submissions within current Task""" - max_submissions_per_worker = task_run.args.max_submissions_per_worker + maximum_units_per_worker = task_run.args.task.maximum_units_per_worker # By default, worker can send any amount of submissions - if max_submissions_per_worker is None: + if maximum_units_per_worker == 0: return True # Find all completed units byt this worker for current task @@ -294,7 +294,7 @@ def can_send_more_submissions_for_task(self, task_run: "TaskRun") -> bool: u for u in task_units if u.get_status() in AssignmentState.completed() ] - if len(completed_task_units) >= max_submissions_per_worker: + if len(completed_task_units) >= maximum_units_per_worker: return False return True From b52796141897d70b518482bbd4b98c04dcd8210f Mon Sep 17 00:00:00 2001 From: Paul Abumov Date: Wed, 1 Nov 2023 14:53:21 -0400 Subject: [PATCH 3/4] Fixed excluding workers in studies with no qualifications --- .../prolific/api/base_api_resource.py | 2 - .../prolific/api/participant_groups.py | 2 + .../providers/prolific/prolific_agent.py | 20 +++------ .../providers/prolific/prolific_datastore.py | 10 +++-- .../providers/prolific/prolific_provider.py | 42 ++++++++++++------- .../providers/prolific/prolific_utils.py | 9 ++++ mephisto/data_model/agent.py | 3 +- mephisto/data_model/task_run.py | 12 ++++-- mephisto/data_model/worker.py | 3 +- 9 files changed, 60 insertions(+), 43 deletions(-) diff --git a/mephisto/abstractions/providers/prolific/api/base_api_resource.py b/mephisto/abstractions/providers/prolific/api/base_api_resource.py index 959144051..a0cfaa440 100644 --- a/mephisto/abstractions/providers/prolific/api/base_api_resource.py +++ b/mephisto/abstractions/providers/prolific/api/base_api_resource.py @@ -99,8 +99,6 @@ def _base_request( else: result = response.json() - logger.debug(f"{log_prefix} Response: {result}") - return result except ProlificException: diff --git a/mephisto/abstractions/providers/prolific/api/participant_groups.py b/mephisto/abstractions/providers/prolific/api/participant_groups.py index be2664977..ac9ccd148 100644 --- a/mephisto/abstractions/providers/prolific/api/participant_groups.py +++ b/mephisto/abstractions/providers/prolific/api/participant_groups.py @@ -111,6 +111,8 @@ def remove_participants_from_group( https://docs.prolific.co/docs/api-docs/public/#tag/ Participant-Groups/paths/~1api~1v1~1participant-groups~1%7Bid%7D~1participants~1/delete """ + from mephisto.utils.logger_core import get_logger + logger = get_logger(name=__name__) endpoint = cls.list_participants_for_group_api_endpoint.format(id=id) params = dict(participant_ids=participant_ids) response_json = cls.delete(endpoint, params=params) diff --git a/mephisto/abstractions/providers/prolific/prolific_agent.py b/mephisto/abstractions/providers/prolific/prolific_agent.py index bf5c84c31..2df736114 100644 --- a/mephisto/abstractions/providers/prolific/prolific_agent.py +++ b/mephisto/abstractions/providers/prolific/prolific_agent.py @@ -97,20 +97,6 @@ def new_from_provider_data( agent._unit = unit task_run: "TaskRun" = agent.get_task_run() - # In case provider API wasn't responsive, we ensure this submission - # doesn't exceed per-worker cap for this Task. Othewrwise don't process submission. - if not worker.can_send_more_submissions_for_task(task_run): - logger.info( - f'Submission from worker "{worker.db_id}" is over the Task\'s submission cap.' - ) - try: - worker.exclude_worker_from_task(task_run) - except Exception: - logger.exception( - f"Failed to exclude worker {worker.db_id} in TaskRun {task_run.db_id}." - ) - return agent - prolific_study_id = provider_data["prolific_study_id"] prolific_submission_id = provider_data["assignment_id"] unit.register_from_provider_data(prolific_study_id, prolific_submission_id) @@ -119,6 +105,8 @@ def new_from_provider_data( # Check whether we need to prevent this worker from future submissions in this Task if not worker.can_send_more_submissions_for_task(task_run): + # Excluding worker from Participant Group (instead of adding to Block List) + # only because Prolific cannot update Block List for an in-progress Study try: worker.exclude_worker_from_task(task_run) except Exception: @@ -267,7 +255,6 @@ def get_status(self) -> str: if prolific_submission_id: prolific_submission = prolific_utils.get_submission(client, prolific_submission_id) else: - # TODO: Not sure about this self.update_status(AgentState.STATUS_EXPIRED) return self.db_status @@ -277,6 +264,9 @@ def get_status(self) -> str: if prolific_submission.status == SubmissionStatus.RESERVED: provider_status = local_status + elif prolific_submission.status == SubmissionStatus.ACTIVE: + # We don't need to map this status in our DB + pass else: provider_status = SUBMISSION_STATUS_TO_AGENT_STATE_MAP.get( prolific_submission.status, diff --git a/mephisto/abstractions/providers/prolific/prolific_datastore.py b/mephisto/abstractions/providers/prolific/prolific_datastore.py index 486a479fa..09fe76772 100644 --- a/mephisto/abstractions/providers/prolific/prolific_datastore.py +++ b/mephisto/abstractions/providers/prolific/prolific_datastore.py @@ -321,7 +321,7 @@ def get_blocked_workers(self) -> List[dict]: results = c.fetchall() return results - def get_bloked_participant_ids(self) -> List[str]: + def get_blocked_participant_ids(self) -> List[str]: return [w["worker_id"] for w in self.get_blocked_workers()] def ensure_unit_exists(self, unit_id: str) -> None: @@ -629,7 +629,7 @@ def find_qualifications_by_ids( task_run_ids: Optional[List[str]] = None, ) -> List[dict]: """Find qualifications by Mephisto ids of qualifications and task runs""" - if not qualification_ids: + if not (qualification_ids or task_run_ids): return [] with self.table_access_condition, self._get_connection() as conn: @@ -645,12 +645,14 @@ def find_qualifications_by_ids( task_run_ids_block = "" if task_run_ids: task_run_ids_str = ",".join([f'"{tid}"' for tid in task_run_ids]) - task_run_ids_block = f"AND task_run_id IN ({task_run_ids_str})" + task_run_ids_block = f"task_run_id IN ({task_run_ids_str})" + + where_block = " AND ".join(filter(bool, [qualification_ids_block, task_run_ids_block])) c.execute( f""" SELECT * FROM qualifications - WHERE {qualification_ids_block} {task_run_ids_block}; + WHERE {where_block}; """ ) results = c.fetchall() diff --git a/mephisto/abstractions/providers/prolific/prolific_provider.py b/mephisto/abstractions/providers/prolific/prolific_provider.py index 5a99dfc86..cbba85fde 100644 --- a/mephisto/abstractions/providers/prolific/prolific_provider.py +++ b/mephisto/abstractions/providers/prolific/prolific_provider.py @@ -29,6 +29,7 @@ from mephisto.abstractions.providers.prolific.prolific_unit import ProlificUnit from mephisto.abstractions.providers.prolific.prolific_worker import ProlificWorker from mephisto.abstractions.providers.prolific.provider_type import PROVIDER_TYPE +from mephisto.data_model.worker import Worker from mephisto.operations.registry import register_mephisto_abstraction from mephisto.utils.logger_core import get_logger from mephisto.utils.qualifications import QualificationType @@ -44,14 +45,13 @@ from .api.exceptions import ProlificException if TYPE_CHECKING: + from mephisto.data_model.task import Task from mephisto.data_model.task_run import TaskRun from mephisto.data_model.unit import Unit - from mephisto.data_model.worker import Worker from mephisto.data_model.requester import Requester from mephisto.data_model.agent import Agent from mephisto.abstractions.blueprint import SharedTaskState - DEFAULT_FRAME_HEIGHT = 0 DEFAULT_PROLIFIC_GROUP_NAME_ALLOW_LIST = "Allow list" DEFAULT_PROLIFIC_GROUP_NAME_BLOCK_LIST = "Block list" @@ -173,18 +173,16 @@ def _get_client(self, requester_name: str) -> ProlificClient: def _get_qualified_workers( self, qualifications: List[QualificationType], - bloked_participant_ids: List[str], + blocked_participant_ids: List[str], task_run: "TaskRun", ) -> List["Worker"]: qualified_workers = [] workers: List[Worker] = self.db.find_workers(provider_type="prolific") # `worker_name` is Prolific Participant ID in provider-specific datastore - available_workers = [w for w in workers if w.worker_name not in bloked_participant_ids] + available_workers = [w for w in workers if w.worker_name not in blocked_participant_ids] for worker in available_workers: - if worker.can_send_more_submissions_for_task(task_run) and worker_is_qualified( - worker, qualifications - ): + if worker_is_qualified(worker, qualifications): qualified_workers.append(worker) return qualified_workers @@ -216,6 +214,21 @@ def _create_participant_group_with_qualified_workers( ) return prolific_participant_group + def _get_excluded_participant_ids(self, task_run: "TaskRun") -> List[str]: + """ Find participant_ids that exceeded `maximum_units_per_worker` cap within this Task + """ + task: "Task" = task_run.get_task() + task_units: List["Unit"] = self.db.find_units(task_id=task.db_id) + + excluded_participant_ids: List[str] = [] + for unit in task_units: + if unit.worker_id: + worker: "Worker" = Worker.get(self.db, unit.worker_id) + if not worker.can_send_more_submissions_for_task(task_run): + excluded_participant_ids.append(worker.worker_name) + + return list(set(excluded_participant_ids)) + def setup_resources_for_task_run( self, task_run: "TaskRun", @@ -264,11 +277,12 @@ def setup_resources_for_task_run( title=args.provider.prolific_project_name, ) - blocked_participant_ids = self.datastore.get_bloked_participant_ids() - + blocked_participant_ids: List[str] = self.datastore.get_blocked_participant_ids() + excluded_participant_ids: List[str] = self._get_excluded_participant_ids(task_run) # If no Mephisto qualifications found, # we need to block Mephisto workers on Prolific as well - if blocked_participant_ids: + participant_ids_to_add_to_block_list = blocked_participant_ids + excluded_participant_ids + if participant_ids_to_add_to_block_list: new_prolific_specific_qualifications = [] # Add empty Blacklist in case if there is not in state or config blacklist_qualification = DictConfig( @@ -288,21 +302,21 @@ def setup_resources_for_task_run( whitelist_qualification = prolific_specific_qualification prev_value = whitelist_qualification["white_list"] whitelist_qualification["white_list"] = [ - p for p in prev_value if p not in blocked_participant_ids + p for p in prev_value if p not in participant_ids_to_add_to_block_list ] new_prolific_specific_qualifications.append(whitelist_qualification) elif name == ParticipantGroupEligibilityRequirement.name: # Remove blocked Participat IDs from Participant Group Eligibility Requirement client.ParticipantGroups.remove_participants_from_group( id=prolific_specific_qualification["id"], - participant_ids=blocked_participant_ids, + participant_ids=participant_ids_to_add_to_block_list, ) else: new_prolific_specific_qualifications.append(prolific_specific_qualification) # Set Blacklist Eligibility Requirement blacklist_qualification["black_list"] = list( - set(blacklist_qualification["black_list"] + blocked_participant_ids) + set(blacklist_qualification["black_list"] + participant_ids_to_add_to_block_list) ) new_prolific_specific_qualifications.append(blacklist_qualification) prolific_specific_qualifications = new_prolific_specific_qualifications @@ -310,7 +324,7 @@ def setup_resources_for_task_run( if qualifications: qualified_workers = self._get_qualified_workers( qualifications, - blocked_participant_ids, + participant_ids_to_add_to_block_list, task_run, ) diff --git a/mephisto/abstractions/providers/prolific/prolific_utils.py b/mephisto/abstractions/providers/prolific/prolific_utils.py index 30cd85a8d..ac6946849 100644 --- a/mephisto/abstractions/providers/prolific/prolific_utils.py +++ b/mephisto/abstractions/providers/prolific/prolific_utils.py @@ -393,6 +393,15 @@ def compose_completion_codes(code_suffix: str) -> List[dict]: ), ], ), + dict( + code=f"{constants.StudyCodeType.OTHER}_{code_suffix}", + code_type=constants.StudyCodeType.OTHER, + actions=[ + dict( + action=constants.StudyAction.MANUALLY_REVIEW, + ), + ], + ), ] # Task info diff --git a/mephisto/data_model/agent.py b/mephisto/data_model/agent.py index 43ff863d9..1b19b065e 100644 --- a/mephisto/data_model/agent.py +++ b/mephisto/data_model/agent.py @@ -537,8 +537,7 @@ def new_from_provider_data( unit.worker_id = worker.db_id agent._unit = unit - # In case provider API wasn't responsive, we ensure this submission - # doesn't exceed per-worker cap for this Task. Othewrwise don't process submission. + # Prevent sending more units to worker if worker exceeded submission cap within this Task task_run: "TaskRun" = agent.get_task_run() if not worker.can_send_more_submissions_for_task(task_run): try: diff --git a/mephisto/data_model/task_run.py b/mephisto/data_model/task_run.py index 476cc917b..abaf539d3 100644 --- a/mephisto/data_model/task_run.py +++ b/mephisto/data_model/task_run.py @@ -264,10 +264,14 @@ def get_valid_units_for_worker(self, worker: "Worker") -> List["Unit"]: # Cannot pair with self units: List["Unit"] = [] - for unit_set in unit_assigns.values(): - is_self_set = map(lambda u: u.worker_id == worker.db_id, unit_set) - if not any(is_self_set): - units += unit_set + for unit_list in unit_assigns.values(): + self_linked_units = [ + u + for u in unit_list + if u.worker_id == worker.db_id and u.db_status == AssignmentState.LAUNCHED + ] + if not self_linked_units: + units += unit_list # Valid units must be launched and must not be special units (negative indices) # Can use db_status directly rather than polling in the critical path, as in diff --git a/mephisto/data_model/worker.py b/mephisto/data_model/worker.py index 6a27a7061..ce0687159 100644 --- a/mephisto/data_model/worker.py +++ b/mephisto/data_model/worker.py @@ -293,8 +293,7 @@ def can_send_more_submissions_for_task(self, task_run: "TaskRun") -> bool: completed_task_units = [ u for u in task_units if u.get_status() in AssignmentState.completed() ] - - if len(completed_task_units) >= maximum_units_per_worker: + if len(completed_task_units) >= maximum_units_per_worker - 1: return False return True From c5c5bc0346c92e690947cd06f402dd72446584b4 Mon Sep 17 00:00:00 2001 From: Paul Abumov Date: Thu, 16 Nov 2023 18:41:36 -0500 Subject: [PATCH 4/4] Added sanity check for Mturk sandbox testing --- mephisto/abstractions/providers/mturk/mturk_utils.py | 6 +++++- .../providers/prolific/api/participant_groups.py | 1 + .../abstractions/providers/prolific/prolific_provider.py | 3 +-- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/mephisto/abstractions/providers/mturk/mturk_utils.py b/mephisto/abstractions/providers/mturk/mturk_utils.py index 2cc85246e..6a3109522 100644 --- a/mephisto/abstractions/providers/mturk/mturk_utils.py +++ b/mephisto/abstractions/providers/mturk/mturk_utils.py @@ -439,7 +439,8 @@ def create_hit_type( has_locale_qual = True locale_requirements += existing_qualifications - if not has_locale_qual and not client_is_sandbox(client): + is_sandbox = client_is_sandbox(client) + if not has_locale_qual and not is_sandbox: allowed_locales = get_config_arg("mturk", "allowed_locales") if allowed_locales is None: allowed_locales = [ @@ -458,6 +459,9 @@ def create_hit_type( } ) + if is_sandbox: + hit_reward = 0 + # Create the HIT type response = client.create_hit_type( AutoApprovalDelayInSeconds=auto_approve_delay, diff --git a/mephisto/abstractions/providers/prolific/api/participant_groups.py b/mephisto/abstractions/providers/prolific/api/participant_groups.py index ac9ccd148..37ad2363f 100644 --- a/mephisto/abstractions/providers/prolific/api/participant_groups.py +++ b/mephisto/abstractions/providers/prolific/api/participant_groups.py @@ -112,6 +112,7 @@ def remove_participants_from_group( Participant-Groups/paths/~1api~1v1~1participant-groups~1%7Bid%7D~1participants~1/delete """ from mephisto.utils.logger_core import get_logger + logger = get_logger(name=__name__) endpoint = cls.list_participants_for_group_api_endpoint.format(id=id) params = dict(participant_ids=participant_ids) diff --git a/mephisto/abstractions/providers/prolific/prolific_provider.py b/mephisto/abstractions/providers/prolific/prolific_provider.py index cbba85fde..f7153807c 100644 --- a/mephisto/abstractions/providers/prolific/prolific_provider.py +++ b/mephisto/abstractions/providers/prolific/prolific_provider.py @@ -215,8 +215,7 @@ def _create_participant_group_with_qualified_workers( return prolific_participant_group def _get_excluded_participant_ids(self, task_run: "TaskRun") -> List[str]: - """ Find participant_ids that exceeded `maximum_units_per_worker` cap within this Task - """ + """Find participant_ids that exceeded `maximum_units_per_worker` cap within this Task""" task: "Task" = task_run.get_task() task_units: List["Unit"] = self.db.find_units(task_id=task.db_id)