|
|
|
@ -362,45 +362,63 @@ public abstract class AbstractConnectionManager extends ControlledRunnable {
@@ -362,45 +362,63 @@ public abstract class AbstractConnectionManager extends ControlledRunnable {
|
|
|
|
|
|
|
|
|
|
selectedKeys = this.selector.selectedKeys().iterator(); |
|
|
|
|
|
|
|
|
|
if (selectedKeys.hasNext() == false) |
|
|
|
|
return; |
|
|
|
|
|
|
|
|
|
// No need for the "if selectedKeys.hasNext() == false" check here
|
|
|
|
|
while (selectedKeys.hasNext()) { |
|
|
|
|
thisKey = selectedKeys.next(); |
|
|
|
|
|
|
|
|
|
//To shake out any issues
|
|
|
|
|
if (thisKey.attachment() == null) |
|
|
|
|
// To shake out any issues, logging null attachments
|
|
|
|
|
if (thisKey.attachment() == null) { |
|
|
|
|
Logger.error("Found null attachment! PANIC!"); |
|
|
|
|
selectedKeys.remove(); // Safely remove invalid keys from the set
|
|
|
|
|
continue; // Skip to the next key
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (thisKey.attachment() instanceof AbstractConnection) |
|
|
|
|
if (((AbstractConnection) thisKey.attachment()).execTask.get() == true) |
|
|
|
|
continue; |
|
|
|
|
|
|
|
|
|
selectedKeys.remove(); |
|
|
|
|
// Check for task execution state and continue if already running
|
|
|
|
|
if (thisKey.attachment() instanceof AbstractConnection) { |
|
|
|
|
AbstractConnection connection = (AbstractConnection) thisKey.attachment(); |
|
|
|
|
if (connection.execTask.get()) { |
|
|
|
|
selectedKeys.remove(); // Safely remove the key from selected keys
|
|
|
|
|
continue; // Skip to the next key
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
if (thisKey.isValid() == false) |
|
|
|
|
break; // Changed from continue
|
|
|
|
|
else if (thisKey.isAcceptable()) |
|
|
|
|
// Validity check for the key, move the removal after key validation
|
|
|
|
|
if (!thisKey.isValid()) { |
|
|
|
|
Logger.warn("Invalid SelectionKey found. Skipping."); |
|
|
|
|
selectedKeys.remove(); // Remove invalid keys after validity check
|
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Process the appropriate operation based on the key state
|
|
|
|
|
if (thisKey.isAcceptable()) { |
|
|
|
|
this.acceptNewConnection(thisKey); |
|
|
|
|
else if (thisKey.isReadable()) |
|
|
|
|
} else if (thisKey.isReadable()) { |
|
|
|
|
jm.submitJob(new ReadOperationHander(thisKey)); |
|
|
|
|
else if (thisKey.isWritable()) |
|
|
|
|
} else if (thisKey.isWritable()) { |
|
|
|
|
jm.submitJob(new WriteOperationHander(thisKey)); |
|
|
|
|
else if (thisKey.isConnectable()) |
|
|
|
|
} else if (thisKey.isConnectable()) { |
|
|
|
|
this.finishConnectingTo(thisKey); |
|
|
|
|
else |
|
|
|
|
} else { |
|
|
|
|
Logger.error("Unhandled keystate: " + thisKey.toString()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Safely remove processed key
|
|
|
|
|
selectedKeys.remove(); |
|
|
|
|
|
|
|
|
|
} catch (CancelledKeyException cke) { |
|
|
|
|
Logger.error(this.getLocalNodeName(), cke); |
|
|
|
|
this.disconnect(thisKey); |
|
|
|
|
selectedKeys.remove(); // Safely remove cancelled key
|
|
|
|
|
|
|
|
|
|
} catch (IOException e) { |
|
|
|
|
Logger.error(this.getLocalNodeName(), e); |
|
|
|
|
selectedKeys.remove(); // Safely remove on error
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
protected void connectTo(String host, int port) { |
|
|
|
|
try { |
|
|
|
|
this.connectTo(InetAddress.getByName(host), port); |
|
|
|
|