diff --git a/mephisto/abstractions/providers/mturk/mturk_utils.py b/mephisto/abstractions/providers/mturk/mturk_utils.py index 64fb92404..f385b8bf6 100644 --- a/mephisto/abstractions/providers/mturk/mturk_utils.py +++ b/mephisto/abstractions/providers/mturk/mturk_utils.py @@ -439,7 +439,8 @@ def create_hit_type( has_locale_qual = True locale_requirements += existing_qualifications - if not has_locale_qual and not client_is_sandbox(client): + is_sandbox = client_is_sandbox(client) + if not has_locale_qual and not is_sandbox: allowed_locales = get_config_arg("mturk", "allowed_locales") if allowed_locales is None: allowed_locales = [ @@ -458,6 +459,9 @@ def create_hit_type( } ) + if is_sandbox: + hit_reward = 0 + # Create the HIT type response = client.create_hit_type( AutoApprovalDelayInSeconds=auto_approve_delay, diff --git a/mephisto/abstractions/providers/prolific/api/base_api_resource.py b/mephisto/abstractions/providers/prolific/api/base_api_resource.py index 6efa20eb2..69b5ca5eb 100644 --- a/mephisto/abstractions/providers/prolific/api/base_api_resource.py +++ b/mephisto/abstractions/providers/prolific/api/base_api_resource.py @@ -99,8 +99,6 @@ def _base_request( else: result = response.json() - logger.debug(f"{log_prefix} Response: {result}") - return result except ProlificException: diff --git a/mephisto/abstractions/providers/prolific/api/participant_groups.py b/mephisto/abstractions/providers/prolific/api/participant_groups.py index bbd1928af..85aaa22eb 100644 --- a/mephisto/abstractions/providers/prolific/api/participant_groups.py +++ b/mephisto/abstractions/providers/prolific/api/participant_groups.py @@ -111,6 +111,9 @@ def remove_participants_from_group( https://docs.prolific.co/docs/api-docs/public/#tag/ Participant-Groups/paths/~1api~1v1~1participant-groups~1%7Bid%7D~1participants~1/delete """ + from mephisto.utils.logger_core import get_logger + + logger = get_logger(name=__name__) endpoint = cls.list_participants_for_group_api_endpoint.format(id=id) params = dict(participant_ids=participant_ids) response_json = cls.delete(endpoint, params=params) diff --git a/mephisto/abstractions/providers/prolific/prolific_agent.py b/mephisto/abstractions/providers/prolific/prolific_agent.py index a97a8d78c..563197714 100644 --- a/mephisto/abstractions/providers/prolific/prolific_agent.py +++ b/mephisto/abstractions/providers/prolific/prolific_agent.py @@ -92,13 +92,29 @@ def new_from_provider_data( assert isinstance(unit, ProlificUnit), "Can only register Prolific agents to Prolific units" + agent = cls.new(db, worker, unit) + unit.worker_id = worker.db_id + agent._unit = unit + task_run: "TaskRun" = agent.get_task_run() + prolific_study_id = provider_data["prolific_study_id"] prolific_submission_id = provider_data["assignment_id"] unit.register_from_provider_data(prolific_study_id, prolific_submission_id) logger.debug("Prolific Submission has been registered successfully") - return super().new_from_provider_data(db, worker, unit, provider_data) + # Check whether we need to prevent this worker from future submissions in this Task + if not worker.can_send_more_submissions_for_task(task_run): + # Excluding worker from Participant Group (instead of adding to Block List) + # only because Prolific cannot update Block List for an in-progress Study + try: + worker.exclude_worker_from_task(task_run) + except Exception: + logger.exception( + f"Failed to exclude worker {worker.db_id} in TaskRun {task_run.db_id}." + ) + + return agent def approve_work( self, @@ -241,7 +257,6 @@ def get_status(self) -> str: if prolific_submission_id: prolific_submission = prolific_utils.get_submission(client, prolific_submission_id) else: - # TODO: Not sure about this self.update_status(AgentState.STATUS_EXPIRED) return self.db_status @@ -251,6 +266,9 @@ def get_status(self) -> str: if prolific_submission.status == SubmissionStatus.RESERVED: provider_status = local_status + elif prolific_submission.status == SubmissionStatus.ACTIVE: + # We don't need to map this status in our DB + pass else: provider_status = SUBMISSION_STATUS_TO_AGENT_STATE_MAP.get( prolific_submission.status, diff --git a/mephisto/abstractions/providers/prolific/prolific_datastore.py b/mephisto/abstractions/providers/prolific/prolific_datastore.py index ee273f8b1..c3fc8eda6 100644 --- a/mephisto/abstractions/providers/prolific/prolific_datastore.py +++ b/mephisto/abstractions/providers/prolific/prolific_datastore.py @@ -321,7 +321,7 @@ def get_blocked_workers(self) -> List[dict]: results = c.fetchall() return results - def get_bloked_participant_ids(self) -> List[str]: + def get_blocked_participant_ids(self) -> List[str]: return [w["worker_id"] for w in self.get_blocked_workers()] def ensure_unit_exists(self, unit_id: str) -> None: @@ -629,7 +629,7 @@ def find_qualifications_by_ids( task_run_ids: Optional[List[str]] = None, ) -> List[dict]: """Find qualifications by Mephisto ids of qualifications and task runs""" - if not qualification_ids: + if not (qualification_ids or task_run_ids): return [] with self.table_access_condition, self._get_connection() as conn: @@ -645,12 +645,14 @@ def find_qualifications_by_ids( task_run_ids_block = "" if task_run_ids: task_run_ids_str = ",".join([f'"{tid}"' for tid in task_run_ids]) - task_run_ids_block = f"AND task_run_id IN ({task_run_ids_str})" + task_run_ids_block = f"task_run_id IN ({task_run_ids_str})" + + where_block = " AND ".join(filter(bool, [qualification_ids_block, task_run_ids_block])) c.execute( f""" SELECT * FROM qualifications - WHERE {qualification_ids_block} {task_run_ids_block}; + WHERE {where_block}; """ ) results = c.fetchall() diff --git a/mephisto/abstractions/providers/prolific/prolific_provider.py b/mephisto/abstractions/providers/prolific/prolific_provider.py index 09cab0d93..b12b46713 100644 --- a/mephisto/abstractions/providers/prolific/prolific_provider.py +++ b/mephisto/abstractions/providers/prolific/prolific_provider.py @@ -29,6 +29,7 @@ from mephisto.abstractions.providers.prolific.prolific_unit import ProlificUnit from mephisto.abstractions.providers.prolific.prolific_worker import ProlificWorker from mephisto.abstractions.providers.prolific.provider_type import PROVIDER_TYPE +from mephisto.data_model.worker import Worker from mephisto.operations.registry import register_mephisto_abstraction from mephisto.utils.logger_core import get_logger from mephisto.utils.qualifications import QualificationType @@ -44,14 +45,13 @@ from .api.exceptions import ProlificException if TYPE_CHECKING: + from mephisto.data_model.task import Task from mephisto.data_model.task_run import TaskRun from mephisto.data_model.unit import Unit - from mephisto.data_model.worker import Worker from mephisto.data_model.requester import Requester from mephisto.data_model.agent import Agent from mephisto.abstractions.blueprint import SharedTaskState - DEFAULT_FRAME_HEIGHT = 0 DEFAULT_PROLIFIC_GROUP_NAME_ALLOW_LIST = "Allow list" DEFAULT_PROLIFIC_GROUP_NAME_BLOCK_LIST = "Block list" @@ -173,12 +173,13 @@ def _get_client(self, requester_name: str) -> ProlificClient: def _get_qualified_workers( self, qualifications: List[QualificationType], - bloked_participant_ids: List[str], + blocked_participant_ids: List[str], + task_run: "TaskRun", ) -> List["Worker"]: qualified_workers = [] workers: List[Worker] = self.db.find_workers(provider_type="prolific") # `worker_name` is Prolific Participant ID in provider-specific datastore - available_workers = [w for w in workers if w.worker_name not in bloked_participant_ids] + available_workers = [w for w in workers if w.worker_name not in blocked_participant_ids] for worker in available_workers: if worker_is_qualified(worker, qualifications): @@ -213,6 +214,20 @@ def _create_participant_group_with_qualified_workers( ) return prolific_participant_group + def _get_excluded_participant_ids(self, task_run: "TaskRun") -> List[str]: + """Find participant_ids that exceeded `maximum_units_per_worker` cap within this Task""" + task: "Task" = task_run.get_task() + task_units: List["Unit"] = self.db.find_units(task_id=task.db_id) + + excluded_participant_ids: List[str] = [] + for unit in task_units: + if unit.worker_id: + worker: "Worker" = Worker.get(self.db, unit.worker_id) + if not worker.can_send_more_submissions_for_task(task_run): + excluded_participant_ids.append(worker.worker_name) + + return list(set(excluded_participant_ids)) + def setup_resources_for_task_run( self, task_run: "TaskRun", @@ -261,11 +276,12 @@ def setup_resources_for_task_run( title=args.provider.prolific_project_name, ) - blocked_participant_ids = self.datastore.get_bloked_participant_ids() - + blocked_participant_ids: List[str] = self.datastore.get_blocked_participant_ids() + excluded_participant_ids: List[str] = self._get_excluded_participant_ids(task_run) # If no Mephisto qualifications found, # we need to block Mephisto workers on Prolific as well - if blocked_participant_ids: + participant_ids_to_add_to_block_list = blocked_participant_ids + excluded_participant_ids + if participant_ids_to_add_to_block_list: new_prolific_specific_qualifications = [] # Add empty Blacklist in case if there is not in state or config blacklist_qualification = DictConfig( @@ -285,27 +301,31 @@ def setup_resources_for_task_run( whitelist_qualification = prolific_specific_qualification prev_value = whitelist_qualification["white_list"] whitelist_qualification["white_list"] = [ - p for p in prev_value if p not in blocked_participant_ids + p for p in prev_value if p not in participant_ids_to_add_to_block_list ] new_prolific_specific_qualifications.append(whitelist_qualification) elif name == ParticipantGroupEligibilityRequirement.name: # Remove blocked Participat IDs from Participant Group Eligibility Requirement client.ParticipantGroups.remove_participants_from_group( id=prolific_specific_qualification["id"], - participant_ids=blocked_participant_ids, + participant_ids=participant_ids_to_add_to_block_list, ) else: new_prolific_specific_qualifications.append(prolific_specific_qualification) # Set Blacklist Eligibility Requirement blacklist_qualification["black_list"] = list( - set(blacklist_qualification["black_list"] + blocked_participant_ids) + set(blacklist_qualification["black_list"] + participant_ids_to_add_to_block_list) ) new_prolific_specific_qualifications.append(blacklist_qualification) prolific_specific_qualifications = new_prolific_specific_qualifications if qualifications: - qualified_workers = self._get_qualified_workers(qualifications, blocked_participant_ids) + qualified_workers = self._get_qualified_workers( + qualifications, + participant_ids_to_add_to_block_list, + task_run, + ) if qualified_workers: prolific_workers_ids = [w.worker_name for w in qualified_workers] diff --git a/mephisto/abstractions/providers/prolific/prolific_utils.py b/mephisto/abstractions/providers/prolific/prolific_utils.py index 1d85cfa6c..8c02268d0 100644 --- a/mephisto/abstractions/providers/prolific/prolific_utils.py +++ b/mephisto/abstractions/providers/prolific/prolific_utils.py @@ -394,6 +394,15 @@ def compose_completion_codes(code_suffix: str) -> List[dict]: ), ], ), + dict( + code=f"{constants.StudyCodeType.OTHER}_{code_suffix}", + code_type=constants.StudyCodeType.OTHER, + actions=[ + dict( + action=constants.StudyAction.MANUALLY_REVIEW, + ), + ], + ), ] # Task info @@ -579,7 +588,10 @@ def remove_worker_qualification( *args, **kwargs, ) -> None: - """Remove a qualification for the given worker (remove a worker from a Participant Group)""" + """ + Remove a qualification for the given worker (remove a worker from a Participant Group). + NOTE: If a participant is not a member of the group, they will be ignored (from API Docs) + """ try: client.ParticipantGroups.remove_participants_from_group( id=qualification_id, @@ -592,6 +604,14 @@ def remove_worker_qualification( raise +def exclude_worker_from_participant_group( + client: ProlificClient, + worker_id: str, + participant_group_id: str, +): + remove_worker_qualification(client, worker_id, participant_group_id) + + def pay_bonus( client: ProlificClient, task_run_config: "DictConfig", diff --git a/mephisto/abstractions/providers/prolific/prolific_worker.py b/mephisto/abstractions/providers/prolific/prolific_worker.py index d0e88a8f9..b48c92a2b 100644 --- a/mephisto/abstractions/providers/prolific/prolific_worker.py +++ b/mephisto/abstractions/providers/prolific/prolific_worker.py @@ -13,8 +13,6 @@ from typing import Tuple from typing import TYPE_CHECKING -from omegaconf import DictConfig - from mephisto.abstractions.providers.prolific import prolific_utils from mephisto.abstractions.providers.prolific.api.client import ProlificClient from mephisto.abstractions.providers.prolific.provider_type import PROVIDER_TYPE @@ -28,6 +26,7 @@ from mephisto.abstractions.providers.prolific.prolific_requester import ProlificRequester from mephisto.abstractions.providers.prolific.prolific_unit import ProlificUnit from mephisto.data_model.requester import Requester + from mephisto.data_model.task import Task from mephisto.data_model.task_run import TaskRun from mephisto.data_model.unit import Unit @@ -181,6 +180,46 @@ def unblock_worker(self, reason: str, requester: "Requester") -> Tuple[bool, str return True, "" + def exclude_worker_from_task( + self, + task_run: Optional["TaskRun"] = None, + ) -> Tuple[bool, str]: + """Exclude this worker from current Task""" + logger.debug(f"{self.log_prefix}Excluding worker {self.worker_name} from Prolific") + + # 1. Get Client + requester: "ProlificRequester" = task_run.get_requester() + client = self._get_client(requester.requester_name) + + # 2. Find TaskRun IDs that are related to current Task + task: "Task" = task_run.get_task() + all_task_run_ids_for_task: List[str] = [t.db_id for t in task.get_runs()] + + # 3. Select all Participant Group IDs that are related to the Task + datastore_qualifications = self.datastore.find_qualifications_by_ids( + task_run_ids=all_task_run_ids_for_task, + ) + prolific_participant_group_ids = [ + q["prolific_participant_group_id"] for q in datastore_qualifications + ] + + logger.debug( + f"{self.log_prefix}Found {len(prolific_participant_group_ids)} Participant Groups: " + f"{prolific_participant_group_ids}" + ) + + # 4. Exclude the Worker from Prolific Participant Groups + for prolific_participant_group_id in prolific_participant_group_ids: + prolific_utils.exclude_worker_from_participant_group( + client, + self.worker_name, + prolific_participant_group_id, + ) + + logger.debug(f"{self.log_prefix}Worker {self.worker_name} excluded") + + return True, "" + def is_blocked(self, requester: "Requester") -> bool: """Determine if a worker is blocked""" task_run = self._get_first_task_run(requester) diff --git a/mephisto/data_model/agent.py b/mephisto/data_model/agent.py index 81afa2044..f9324a599 100644 --- a/mephisto/data_model/agent.py +++ b/mephisto/data_model/agent.py @@ -536,6 +536,17 @@ def new_from_provider_data( agent = cls.new(db, worker, unit) unit.worker_id = worker.db_id agent._unit = unit + + # Prevent sending more units to worker if worker exceeded submission cap within this Task + task_run: "TaskRun" = agent.get_task_run() + if not worker.can_send_more_submissions_for_task(task_run): + try: + worker.exclude_worker_from_task(task_run) + except Exception: + logger.exception( + f"Failed to exclude worker {worker.db_id} in TaskRun {task_run.db_id}." + ) + return agent def get_status(self) -> str: diff --git a/mephisto/data_model/task_run.py b/mephisto/data_model/task_run.py index 73a37d74f..7fb860b39 100644 --- a/mephisto/data_model/task_run.py +++ b/mephisto/data_model/task_run.py @@ -264,10 +264,14 @@ def get_valid_units_for_worker(self, worker: "Worker") -> List["Unit"]: # Cannot pair with self units: List["Unit"] = [] - for unit_set in unit_assigns.values(): - is_self_set = map(lambda u: u.worker_id == worker.db_id, unit_set) - if not any(is_self_set): - units += unit_set + for unit_list in unit_assigns.values(): + self_linked_units = [ + u + for u in unit_list + if u.worker_id == worker.db_id and u.db_status == AssignmentState.LAUNCHED + ] + if not self_linked_units: + units += unit_list # Valid units must be launched and must not be special units (negative indices) # Can use db_status directly rather than polling in the critical path, as in diff --git a/mephisto/data_model/worker.py b/mephisto/data_model/worker.py index 2b1f9bb7f..6f41ef223 100644 --- a/mephisto/data_model/worker.py +++ b/mephisto/data_model/worker.py @@ -12,6 +12,8 @@ MephistoDataModelComponentMixin, ) from typing import Any, List, Optional, Mapping, Tuple, Dict, Type, Tuple, TYPE_CHECKING + +from mephisto.data_model.constants.assignment_state import AssignmentState from mephisto.utils.logger_core import get_logger logger = get_logger(name=__name__) @@ -259,6 +261,17 @@ def unblock_worker(self, reason: str, requester: "Requester") -> bool: """unblock a blocked worker for the specified reason""" raise NotImplementedError() + def exclude_worker_from_task( + self, + task_run: Optional["TaskRun"] = None, + ) -> Tuple[bool, str]: + """ + Prevent this worker from further participation in current Task. + (Note that scope of exclusion is only within the current Task, + whereas block lists or altering worker qualifications would affect future Tasks.) + """ + pass + def is_blocked(self, requester: "Requester") -> bool: """Determine if a worker is blocked""" raise NotImplementedError() @@ -267,6 +280,24 @@ def is_eligible(self, task_run: "TaskRun") -> bool: """Determine if this worker is eligible for the given task run""" raise NotImplementedError() + def can_send_more_submissions_for_task(self, task_run: "TaskRun") -> bool: + """Check whether a worker is allowed to send any more submissions within current Task""" + maximum_units_per_worker = task_run.args.task.maximum_units_per_worker + + # By default, worker can send any amount of submissions + if maximum_units_per_worker == 0: + return True + + # Find all completed units byt this worker for current task + task_units = self.db.find_units(task_id=task_run.task_id, worker_id=self.db_id) + completed_task_units = [ + u for u in task_units if u.get_status() in AssignmentState.completed() + ] + if len(completed_task_units) >= maximum_units_per_worker - 1: + return False + + return True + def register(self, args: Optional[Dict[str, str]] = None) -> None: """Register this worker with the crowdprovider, if necessary""" pass