Skip to content

Commit

Permalink
[SPARK-51023] log remote address on RPC exception
Browse files Browse the repository at this point in the history
  • Loading branch information
oleksii.diagiliev committed Jan 29, 2025
1 parent 6bbfa2d commit e033428
Showing 1 changed file with 11 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,9 @@ public void onFailure(Throwable e) {
}
});
} catch (Exception e) {
logger.error("Error while invoking RpcHandler#receive() on RPC id {}", e,
MDC.of(LogKeys.REQUEST_ID$.MODULE$, req.requestId));
logger.error("Error while invoking RpcHandler#receive() on RPC id {} from {}", e,
MDC.of(LogKeys.REQUEST_ID$.MODULE$, req.requestId),
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)));
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
} finally {
req.body().release();
Expand Down Expand Up @@ -262,8 +263,9 @@ public String getID() {
respond(new RpcResponse(req.requestId,
new NioManagedBuffer(blockPushNonFatalFailure.getResponse())));
} else {
logger.error("Error while invoking RpcHandler#receive() on RPC id {}", e,
MDC.of(LogKeys.REQUEST_ID$.MODULE$, req.requestId));
logger.error("Error while invoking RpcHandler#receive() on RPC id {} from {}", e,
MDC.of(LogKeys.REQUEST_ID$.MODULE$, req.requestId),
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)));
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
}
// We choose to totally fail the channel, rather than trying to recover as we do in other
Expand All @@ -279,7 +281,8 @@ private void processOneWayMessage(OneWayMessage req) {
try {
rpcHandler.receive(reverseClient, req.body().nioByteBuffer());
} catch (Exception e) {
logger.error("Error while invoking RpcHandler#receive() for one-way message.", e);
logger.error("Error while invoking RpcHandler#receive() for one-way message from {}.", e,
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)));
} finally {
req.body().release();
}
Expand All @@ -304,9 +307,10 @@ public void onFailure(Throwable e) {
});
} catch (Exception e) {
logger.error("Error while invoking receiveMergeBlockMetaReq() for appId {} shuffleId {} "
+ "reduceId {}", e, MDC.of(LogKeys.APP_ID$.MODULE$, req.appId),
+ "reduceId {} from {}", e, MDC.of(LogKeys.APP_ID$.MODULE$, req.appId),
MDC.of(LogKeys.SHUFFLE_ID$.MODULE$, req.shuffleId),
MDC.of(LogKeys.REDUCE_ID$.MODULE$, req.reduceId));
MDC.of(LogKeys.REDUCE_ID$.MODULE$, req.reduceId),
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)));
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
}
}
Expand Down

0 comments on commit e033428

Please sign in to comment.