Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Join Map Lane not responding after a high volume of usage #36

Open
SirCipher opened this issue Dec 6, 2019 · 1 comment
Open

Join Map Lane not responding after a high volume of usage #36

SirCipher opened this issue Dec 6, 2019 · 1 comment
Assignees
Labels
C-bug Category: bug

Comments

@SirCipher
Copy link
Member

When using a JoinMapLane, if one is to put a large number of entries in to the map (>9000) in one go, either the observers stop being called or the entries are not written. I have tried waiting for ~30 or so seconds and then attempting to use the map again but still nothing happens. When adding a delay of 10ms between the entries this is not observed.

This is to be investigated and a reproducible example will be put here.

@SirCipher SirCipher added the C-bug Category: bug label Dec 6, 2019
@SirCipher SirCipher self-assigned this Dec 6, 2019
@SirCipher SirCipher changed the title Join Map Lane stalling after a high volume of useage Join Map Lane halting after a high volume of usage Dec 6, 2019
@SirCipher SirCipher changed the title Join Map Lane halting after a high volume of usage Join Map Lane not responding after a high volume of usage Dec 6, 2019
@SirCipher
Copy link
Member Author

A minimal example of this issue:

// Copyright 2015-2019 SWIM.AI inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package swim.server;

import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import swim.actor.ActorSpaceDef;
import swim.api.SwimLane;
import swim.api.SwimRoute;
import swim.api.agent.AbstractAgent;
import swim.api.agent.AgentRoute;
import swim.api.downlink.MapDownlink;
import swim.api.lane.JoinMapLane;
import swim.api.lane.MapLane;
import swim.api.plane.AbstractPlane;
import swim.codec.Format;
import swim.kernel.Kernel;
import swim.observable.function.DidUpdateKey;
import swim.observable.function.WillUpdateKey;
import swim.service.web.WebServiceDef;
import java.util.concurrent.CountDownLatch;
import static org.testng.Assert.fail;

public class JoinMapLaneSpec {

  static class TestMapLaneAgent extends AbstractAgent {
    @SwimLane("map")
    MapLane<String, String> testMap = this.<String, String>mapLane()
        .observe(new TestMapLaneController());

    class TestMapLaneController implements WillUpdateKey<String, String>, DidUpdateKey<String, String> {
      @Override
      public String willUpdate(String key, String newValue) {
        System.out.println(nodeUri() + " willUpdate key: " + Format.debug(key) + "; newValue: " + Format.debug(newValue));
        return newValue;
      }

      @Override
      public void didUpdate(String key, String newValue, String oldValue) {
        System.out.println(nodeUri() + " didUpdate key: " + Format.debug(key) + "; newValue: " + Format.debug(newValue) + "; oldValue: " + Format.debug(oldValue));
      }
    }
  }

  static class TestJoinMapLaneAgent extends AbstractAgent {
    @SwimLane("join")
    JoinMapLane<String, String, String> testJoinMap = this.<String, String, String>joinMapLane()
        .observe(new TestJoinMapLaneController());

    class TestJoinMapLaneController implements WillUpdateKey<String, String>, DidUpdateKey<String, String> {
      @Override
      public String willUpdate(String key, String newValue) {
        System.out.println(nodeUri() + " willUpdate key: " + Format.debug(key) + "; newValue: " + Format.debug(newValue));
        return newValue;
      }

      @Override
      public void didUpdate(String key, String newValue, String oldValue) {
        System.out.println(nodeUri() + " didUpdate key: " + Format.debug(key) + "; newValue: " + Format.debug(newValue) + "; oldValue: " + Format.debug(oldValue));
      }
    }

    @Override
    public void didStart() {
      testJoinMap.downlink("xs").hostUri("warp://localhost:53556").nodeUri("/map/xs").laneUri("map").open();
      testJoinMap.downlink("ys").hostUri("warp://localhost:53556").nodeUri("/map/ys").laneUri("map").open();
    }
  }

  static class TestJoinMapPlane extends AbstractPlane {
    @SwimRoute("/map/:name")
    AgentRoute<TestMapLaneAgent> mapRoute;

    @SwimRoute("/join/map/:name")
    AgentRoute<TestJoinMapLaneAgent> joinMapRoute;
  }

  private TestJoinMapPlane plane;
  private Kernel kernel;

  @BeforeMethod
  public void init() {
    kernel = ServerLoader.loadServerStack();
    plane = kernel.openSpace(ActorSpaceDef.fromName("test"))
        .openPlane("test", TestJoinMapPlane.class);

    kernel.openService(WebServiceDef.standard().port(53556).spaceName("test"));
    kernel.start();
  }

  @AfterMethod
  public void close() {
    kernel.stop();
  }

  @Test
  public void testLinkToJoinMapLane() throws InterruptedException {
    final MapDownlink<String, String> xs = getDownlink("/map/xs", "map", null);
    final MapDownlink<String, String> ys = getDownlink("/map/ys", "map", null);
    final MapDownlink<String, String> join = getDownlink("/join/map/all", "join", (WillUpdateKey) (key, newValue) -> {
      System.out.println("join link willUpdate key: " + Format.debug(key) + "; newValue: " + Format.debug(newValue));
      return newValue;
    });

    for (int i = 0; i < 15000; i++) {
      xs.put(Integer.toString(i), Integer.toString(i));
    }

    Thread.sleep(5000);
  }

  private MapDownlink<String, String> getDownlink(String nodeUri, String laneUri, Object observer) {
    CountDownLatch didSyncLatch = new CountDownLatch(1);
    MapDownlink<String, String> downlink = plane.downlinkMap()
        .keyClass(String.class)
        .valueClass(String.class)
        .hostUri("warp://localhost:53556/")
        .nodeUri(nodeUri)
        .laneUri(laneUri)
        .didSync(didSyncLatch::countDown);

    if (observer != null) {
      downlink.observe(observer);
    }

    downlink.open();

    try {
      didSyncLatch.await();
    } catch (InterruptedException e) {
      e.printStackTrace();
      fail();
    }

    return downlink;
  }

}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
C-bug Category: bug
Projects
None yet
Development

No branches or pull requests

1 participant