Skip to content

Commit

Permalink
Merge branch 'dev' of ssh://github.com/shardingjdbc/sharding-jdbc int…
Browse files Browse the repository at this point in the history
…o dev
  • Loading branch information
tristaZero committed Jun 21, 2018
2 parents 54f6f93 + 99ae970 commit 39c762a
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public interface IClient extends IAction, IGroupAction {
*
* @return ZKTransaction
*/
ZKTransaction transaction();
ZKTransaction transaction();
/*
void createNamespace();
void deleteNamespace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void load() throws KeeperException, InterruptedException {
return;
}
try {
if (status == status.RELEASE) {
if (status == PathStatus.RELEASE) {
LOGGER.debug("loading status:{}", status);
this.setStatus(PathStatus.CHANGING);

Expand Down Expand Up @@ -339,7 +339,7 @@ public void put(final String path, final String value) {
LOGGER.debug("cache put:{},value:{}", path, value);
PathUtils.validatePath(path);
LOGGER.debug("put status:{}", status);
if (status == status.RELEASE) {
if (status == PathStatus.RELEASE) {
if (path.equals(rootNode.get().getKey())) {
rootNode.set(new PathNode(rootNode.get().getKey(), value.getBytes(Constants.UTF_8)));
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
* </p>
*/

package io.shardingsphere.jdbc.orchestration.reg.newzk.client.zookeeper.section;
package io.shardingsphere.jdbc.orchestration.reg.newzk.client.retry;

import io.shardingsphere.jdbc.orchestration.reg.newzk.client.action.IProvider;
import io.shardingsphere.jdbc.orchestration.reg.newzk.client.retry.DelayPolicyExecutor;
import io.shardingsphere.jdbc.orchestration.reg.newzk.client.retry.DelayRetryPolicy;
import io.shardingsphere.jdbc.orchestration.reg.newzk.client.zookeeper.section.Connection;
import lombok.Setter;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -81,8 +80,6 @@ public void exec() throws KeeperException, InterruptedException {
delayPolicyExecutor.next();
if (Connection.needReset(e)) {
provider.resetConnection();
} else {
throw e;
}
execDelay();
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class UsualClient extends BaseClient {
private final boolean watched = true;

@Getter
private io.shardingsphere.jdbc.orchestration.reg.newzk.client.action.IExecStrategy strategy;
private IExecStrategy strategy;

UsualClient(final BaseContext context) {
super(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,8 @@ public boolean executeOperation() throws KeeperException, InterruptedException {
} catch (KeeperException e) {
if (Connection.needReset(e)) {
provider.resetConnection();
result = false;
} else {
throw e;
}
result = false;
}
if (!result && delayPolicyExecutor.hasNext()) {
delayPolicyExecutor.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ public class Connection {
EXCEPTION_RESETS.put(KeeperException.Code.OPERATIONTIMEOUT.intValue(), false);
}

/**
* need retry.
*
* @param e e
* @return need retry
* @throws KeeperException Zookeeper Exception
*/
public static boolean needRetry(final KeeperException e) throws KeeperException {

This comment has been minimized.

Copy link
@WillemJiang

WillemJiang Jun 23, 2018

Member

This method doesn't throw the KeeperException, please update the java doc.

This comment has been minimized.

Copy link
@terrymanu

terrymanu Jun 23, 2018

Member

Thank you for suggestion, updated.

return EXCEPTION_RESETS.containsKey(e.code().intValue());
}

/**
* need reset.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.shardingsphere.jdbc.orchestration.reg.newzk.client.zookeeper.operation.DeleteCurrentBranchOperation;
import io.shardingsphere.jdbc.orchestration.reg.newzk.client.zookeeper.operation.DeleteCurrentOperation;
import io.shardingsphere.jdbc.orchestration.reg.newzk.client.zookeeper.operation.UpdateOperation;
import io.shardingsphere.jdbc.orchestration.reg.newzk.client.zookeeper.section.Connection;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
Expand All @@ -50,9 +51,13 @@ public void createCurrentOnly(final String key, final String value, final Create
String path = getProvider().getRealPath(key);
try {
getProvider().create(path, value, createMode);
} catch (KeeperException.SessionExpiredException e) {
LOGGER.warn("AsyncRetryStrategy SessionExpiredException createCurrentOnly:{}", path);
AsyncRetryCenter.INSTANCE.add(new CreateCurrentOperation(getProvider(), path, value, createMode));
} catch (KeeperException e) {
if (Connection.needRetry(e)) {

This comment has been minimized.

Copy link
@WillemJiang

WillemJiang Jun 23, 2018

Member

Lots of duplicated code, we could put these code into a method.
BTW, it's better to log the exception, otherwise it's hard for the operator to trace the exception.

This comment has been minimized.

Copy link
@terrymanu

terrymanu Jun 23, 2018

Member

It maybe difficult to abstract exception process in one method, because every exception need a different operation for ADD/UPDATE/DELETE.
I just log exception for now

LOGGER.warn("AsyncRetryStrategy SessionExpiredException createCurrentOnly:{}", path);
AsyncRetryCenter.INSTANCE.add(new CreateCurrentOperation(getProvider(), path, value, createMode));
} else {
throw e;
}
}
}

Expand All @@ -61,9 +66,13 @@ public void update(final String key, final String value) throws KeeperException,
String path = getProvider().getRealPath(key);
try {
getProvider().update(path, value);
} catch (KeeperException.SessionExpiredException e) {
LOGGER.warn("AsyncRetryStrategy SessionExpiredException update:{}", path);
AsyncRetryCenter.INSTANCE.add(new UpdateOperation(getProvider(), path, value));
} catch (KeeperException e) {
if (Connection.needRetry(e)) {
LOGGER.warn("AsyncRetryStrategy SessionExpiredException update:{}", path);
AsyncRetryCenter.INSTANCE.add(new UpdateOperation(getProvider(), path, value));
} else {
throw e;
}
}
}

Expand All @@ -72,39 +81,55 @@ public void deleteOnlyCurrent(final String key) throws KeeperException, Interrup
String path = getProvider().getRealPath(key);
try {
getProvider().delete(path);
} catch (KeeperException.SessionExpiredException e) {
LOGGER.warn("AsyncRetryStrategy SessionExpiredException deleteOnlyCurrent:{}", path);
AsyncRetryCenter.INSTANCE.add(new DeleteCurrentOperation(getProvider(), path));
} catch (KeeperException e) {
if (Connection.needRetry(e)) {
LOGGER.warn("AsyncRetryStrategy SessionExpiredException deleteOnlyCurrent:{}", path);
AsyncRetryCenter.INSTANCE.add(new DeleteCurrentOperation(getProvider(), path));
} else {
throw e;
}
}
}

@Override
public void createAllNeedPath(final String key, final String value, final CreateMode createMode) throws KeeperException, InterruptedException {
try {
super.createAllNeedPath(key, value, createMode);
} catch (KeeperException.SessionExpiredException e) {
LOGGER.warn("AllAsyncRetryStrategy SessionExpiredException CreateAllNeedOperation:{}", key);
AsyncRetryCenter.INSTANCE.add(new CreateAllNeedOperation(getProvider(), key, value, createMode));
} catch (KeeperException e) {
if (Connection.needRetry(e)) {
LOGGER.warn("AllAsyncRetryStrategy SessionExpiredException CreateAllNeedOperation:{}", key);
AsyncRetryCenter.INSTANCE.add(new CreateAllNeedOperation(getProvider(), key, value, createMode));
} else {
throw e;
}
}
}

@Override
public void deleteAllChildren(final String key) throws KeeperException, InterruptedException {
try {
super.deleteAllChildren(key);
} catch (KeeperException.SessionExpiredException e) {
LOGGER.warn("AllAsyncRetryStrategy SessionExpiredException deleteAllChildren:{}", key);
AsyncRetryCenter.INSTANCE.add(new DeleteAllChildrenOperation(getProvider(), key));
} catch (KeeperException e) {
if (Connection.needRetry(e)) {
LOGGER.warn("AllAsyncRetryStrategy SessionExpiredException deleteAllChildren:{}", key);
AsyncRetryCenter.INSTANCE.add(new DeleteAllChildrenOperation(getProvider(), key));
} else {
throw e;
}
}
}

@Override
public void deleteCurrentBranch(final String key) throws KeeperException, InterruptedException {
try {
super.deleteCurrentBranch(key);
} catch (KeeperException.SessionExpiredException e) {
LOGGER.warn("AllAsyncRetryStrategy SessionExpiredException deleteCurrentBranch:{}", key);
AsyncRetryCenter.INSTANCE.add(new DeleteCurrentBranchOperation(getProvider(), key));
} catch (KeeperException e) {
if (Connection.needRetry(e)) {
LOGGER.warn("AllAsyncRetryStrategy SessionExpiredException deleteCurrentBranch:{}", key);
AsyncRetryCenter.INSTANCE.add(new DeleteCurrentBranchOperation(getProvider(), key));
} else {
throw e;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import io.shardingsphere.jdbc.orchestration.reg.newzk.client.action.IProvider;
import io.shardingsphere.jdbc.orchestration.reg.newzk.client.retry.DelayRetryPolicy;
import io.shardingsphere.jdbc.orchestration.reg.newzk.client.zookeeper.section.Callable;
import io.shardingsphere.jdbc.orchestration.reg.newzk.client.retry.Callable;
import lombok.AccessLevel;
import lombok.Getter;
import org.apache.zookeeper.CreateMode;
Expand Down

0 comments on commit 39c762a

Please sign in to comment.