作者:京東物流 張小龍
本文通過(guò)介紹分布式應(yīng)用下各個(gè)場(chǎng)景的全局日志ID透?jìng)魉悸罚约敖榻B分布式日志追蹤ID簡(jiǎn)單實(shí)現(xiàn)原理和實(shí)戰(zhàn)效果,從而達(dá)到通過(guò)提高日志查詢排查問(wèn)題的效率。
背景
開(kāi)發(fā)排查系統(tǒng)問(wèn)題用得最多的手段就是查看系統(tǒng)日志,相信不少人都值過(guò)班當(dāng)過(guò)小秘吧:給下接口和出入?yún)?,麻煩看看日志里的有沒(méi)有異常信息啊等等,但是在并發(fā)大時(shí)使用日志定位問(wèn)題還是比較麻煩,由于大量的其他用戶/其他線程的日志也一起輸出穿行其中導(dǎo)致很難篩選出指定請(qǐng)求的全部相關(guān)日志,以及下游線程/服務(wù)對(duì)應(yīng)的日志,甚至一些特殊場(chǎng)景的出入?yún)⒅淮蛴×艘恍┲T如gis坐標(biāo)、四級(jí)地址等沒(méi)有單據(jù)信息的日志,使得日志定位起來(lái)非常不便
場(chǎng)景分析
自己所在組負(fù)責(zé)的系統(tǒng)主要是web應(yīng)用,其中涉及到的請(qǐng)求方式主要有:springmvc的servlet的http場(chǎng)景、jsf場(chǎng)景、MQ場(chǎng)景、resteasy場(chǎng)景、clover場(chǎng)景、easyjob場(chǎng)景,每一種場(chǎng)景都需要不同的方式進(jìn)行l(wèi)ogTraceId的透?jìng)?,接下?lái)逐個(gè)探析上述各個(gè)場(chǎng)景的透?jìng)鞣桨浮?/p>
在這之前我們先要簡(jiǎn)單了解一下日志中透?jìng)骱痛蛴ogTraceId的方式,一般我們使用MDC進(jìn)行l(wèi)ogTraceId的透?jìng)髋c打印,但是基于MDC內(nèi)部使用的是ThreadLocal所以只有本線程才有效,子線程服務(wù)的MDC里的值會(huì)丟失,所以這里我們要么是在所有涉及到父子線程的地方以編碼侵入式自行實(shí)現(xiàn)值的傳遞,要么就是通過(guò)覆寫(xiě)MDCAdapter:通過(guò)阿里的TransmittableThreadLocal來(lái)解決父子線程傳遞問(wèn)題,而本文采用的是比較粗糙地以編碼侵入式來(lái)解決此問(wèn)題。
springmvc的servlet的http場(chǎng)景
這個(gè)場(chǎng)景相信大家都已經(jīng)爛熟到骨子里了,主要思路是通過(guò)攔截器的方式進(jìn)行l(wèi)ogTraceId的透?jìng)鳎陆ㄒ粋€(gè)類實(shí)現(xiàn)HandlerInterceptor
preHandle:在業(yè)務(wù)處理器處理請(qǐng)求之前被調(diào)用,這里實(shí)現(xiàn)logTraceId的設(shè)置與透?jìng)?/p>
postHandle:在業(yè)務(wù)處理器處理請(qǐng)求執(zhí)行完成后,生成視圖之前執(zhí)行,這里空實(shí)現(xiàn)就好
afterCompletion:在DispatcherServlet完全處理完請(qǐng)求后被調(diào)用,這里用于清除MDC的logTraceId
@Slf4j
public class TraceInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object o) throws Exception {
try{
String traceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
if (StringUtils.isBlank(traceId)) {
MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, TraceUtils.getTraceId());
}
}catch (RuntimeException e){
log.error("mvc自定義log跟蹤攔截器執(zhí)行異常",e);
}
return true;
}
@Override
public void postHandle(javax.servlet.http.HttpServletRequest httpServletRequest, javax.servlet.http.HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {
}
@Override
public void afterCompletion(javax.servlet.http.HttpServletRequest httpServletRequest, javax.servlet.http.HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception {
try{
MDC.clear();
}catch (RuntimeException ex){
log.error("mvc自定義log跟蹤攔截器執(zhí)行異常",ex);
}
}
}
jsf場(chǎng)景
相信大家對(duì)于jsf并不陌生,而jsf也支持自定義filter,基于jsf過(guò)濾器的運(yùn)行方式,可以通過(guò)配置全局過(guò)濾器(繼承AbstractFilter)的方式進(jìn)行l(wèi)ogTraceId的透?jìng)?,需要注意的是jsf是在線程池中執(zhí)行的所以一定要信任消息體中的logTraceId
jsf消費(fèi)者過(guò)濾器:主要從上下文環(huán)境中獲取logTraceId并進(jìn)行透?jìng)鳎瑢?shí)現(xiàn)代碼如下
@Slf4j
public class TraceIdGlobalJsfFilter extends AbstractFilter {
@Override
public ResponseMessage invoke(RequestMessage requestMessage) {
//設(shè)置traceId
setAndGetTraceId(requestMessage);
try{
return this.getNext().invoke(requestMessage);
}finally {
}
}
/**
* 設(shè)置并返回traceId
* @param requestMessage
* @return
*/
private void setAndGetTraceId(RequestMessage requestMessage) {
try{
String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
Object logTraceIdObj = requestMessage.getInvocationBody().getAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY);
if(StringUtils.isBlank(logTraceId) && logTraceIdObj == null){
//如果filter和MDC都沒(méi)有獲取到則說(shuō)明有遺漏,打印日志
if(log.isDebugEnabled()){
log.debug("jsf消費(fèi)者自定義log跟蹤攔截器預(yù)警,filter和MDC都沒(méi)有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
}
} else if(StringUtils.isBlank(logTraceId) && logTraceIdObj != null) {
//如果MDC沒(méi)有,filter有,打印日志
if(log.isDebugEnabled()){
log.debug("jsf消費(fèi)者自定義log跟蹤攔截器預(yù)警,MDC沒(méi)有filter有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
}
} else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj == null){
//如果MDC有,filter沒(méi)有,說(shuō)明是源頭已經(jīng)有了,但是jsf是第一次調(diào),透?jìng)? requestMessage.getInvocationBody().addAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY, logTraceId);
}else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj != null){
//MDC和fitler都有,但是并不相等,則存在問(wèn)題打印日志
if(log.isDebugEnabled()){
log.debug("jsf消費(fèi)者自定義log跟蹤攔截器預(yù)警,MDC和filter都有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
}
}
}catch (RuntimeException e){
log.error("jsf消費(fèi)者自定義log跟蹤攔截器執(zhí)行異常",e);
}
}
}
jsf提供者過(guò)濾器:通過(guò)拿到消費(fèi)者在消息體中透?jìng)鞯膌ogTraceId來(lái)實(shí)現(xiàn),實(shí)現(xiàn)代碼如下
@Slf4j
public class TraceIdGlobalJsfProducerFilter extends AbstractFilter {
@Override
public ResponseMessage invoke(RequestMessage requestMessage) {
//設(shè)置traceId
boolean isNeedClearMdc = transferTraceId(requestMessage);
try{
return this.getNext().invoke(requestMessage);
}finally {
if(isNeedClearMdc){
clear();
}
}
}
/**
* 設(shè)置并返回traceId
* @param requestMessage
* @return
*/
private boolean transferTraceId(RequestMessage requestMessage) {
boolean isNeedClearMdc = false;
try{
String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
Object logTraceIdObj = requestMessage.getInvocationBody().getAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY);
if(StringUtils.isBlank(logTraceId) && logTraceIdObj == null){
//如果filter和MDC都沒(méi)有獲取到,說(shuō)明存在遺漏場(chǎng)景或是提供給外部系統(tǒng)調(diào)用的接口,打印日志進(jìn)行觀察
String traceId = TraceUtils.getTraceId();
MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,traceId);
requestMessage.getInvocationBody().addAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY, traceId);
if(log.isDebugEnabled()){
log.debug("jsf生產(chǎn)者自定義log跟蹤攔截器預(yù)警,filter和MDC都沒(méi)有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
}
isNeedClearMdc = true;
} else if(StringUtils.isBlank(logTraceId) && logTraceIdObj != null) {
//如果MDC沒(méi)有,filter有,說(shuō)明是被調(diào)用方,需要透?jìng)飨氯? MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,logTraceIdObj.toString());
isNeedClearMdc = true;
} else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj == null){
//如果MDC有,filter沒(méi)有,存在問(wèn)題,打印日志
if(log.isDebugEnabled()){
log.debug("jsf生產(chǎn)者自定義log跟蹤攔截器預(yù)警,MDC有filter沒(méi)有traceId,jsf信息:{}", JSON.toJSONString(requestMessage));
}
isNeedClearMdc = true;
}else if(StringUtils.isNotBlank(logTraceId) && logTraceIdObj != null && !logTraceId.equals(logTraceIdObj.toString())){
//MDC和fitler都有,但是并不相等,則信任filter透?jìng)鹘Y(jié)果
TraceUtils.resetTraceId(logTraceIdObj.toString());
if(log.isDebugEnabled()){
log.debug("jsf生產(chǎn)者自定義log跟蹤攔截器預(yù)警,MDC和fitler都有traceId,但是并不相等,jsf信息:{}", JSON.toJSONString(requestMessage));
}
}
return isNeedClearMdc;
}catch (RuntimeException e){
log.error("jsf生產(chǎn)者自定義log跟蹤攔截器執(zhí)行異常",e);
return false;
}
}
/**
* 清除MDC
*/
private void clear() {
try{
MDC.clear();
}catch (RuntimeException e){
log.error("jsf生產(chǎn)者自定義log跟蹤攔截器執(zhí)行異常",e);
}
}
}
MQ場(chǎng)景
說(shuō)到MQ相信大家對(duì)于此就更不陌生了,此種場(chǎng)景主要通過(guò)在提供者發(fā)送消息時(shí)拿到上下文中的logTraceId,將其以擴(kuò)展信息的方式設(shè)置進(jìn)消息體中進(jìn)行透?jìng)鳎M(fèi)者則從消息體中進(jìn)行獲取
生產(chǎn)者:新建一個(gè)抽象類繼承MessageProducer,覆寫(xiě)父類中的兩個(gè)send方法(批量發(fā)送、單條發(fā)送),send方法中主要調(diào)用抽象加工消息體的方法(logTraceId屬性賦值)和日志打印,在子類中進(jìn)行發(fā)送前對(duì)消息體的加工處理,具體代碼如下
@Slf4j
public abstract class BaseTraceIdProducer extends MessageProducer {
private static final String SEPARATOR_COMMA = ",";
public BaseTraceIdProducer() {
}
public BaseTraceIdProducer(TransportManager transportManager) {
super(transportManager);
}
/**
* 獲取消息體-單個(gè)
* @param messageContext
* @return
*/
protected abstract Message getMessage(MessageContext messageContext);
/** 獲取消息體-批量
*
* @param messageContext
* @return
*/
protected abstract List getMessages(MessageContext messageContext);
/**
* 填充消息體上下文信息
* @param message
* @param messageContext
*/
protected void fillContext(Message message,MessageContext messageContext) {
if(message == null){
return;
}
if(StringUtils.isBlank(messageContext.getLogTraceId())){
String logTraceId = message.getAttribute(LogConstants.JMQ2_LOG_TRACE_ID_KEY);
messageContext.setLogTraceId(logTraceId);
}
if(StringUtils.isBlank(messageContext.getTopic())){
String topic = message.getTopic();
messageContext.setTopic(topic);
}
String businessId = message.getBusinessId();
messageContext.getBusinessIdBuf().append(SEPARATOR_COMMA).append(businessId);
}
/**
* traceId嵌入消息體中
* @param message
*/
protected void generateTraceIdIntoMessage(Message message){
if(message == null){
return;
}
try{
String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
if(StringUtils.isBlank(logTraceId)){
logTraceId = TraceUtils.getTraceId();
MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,logTraceId);
}
message.setAttribute(LogConstants.JMQ2_LOG_TRACE_ID_KEY,logTraceId);
}catch (RuntimeException e){
log.error("jmq2自定義log跟蹤攔截器執(zhí)行異常",e);
}
}
/**
* 批量發(fā)送消息-無(wú)回調(diào)
* @param messages
* @param timeout
* @throws JMQException
*/
public void send(List messages, int timeout) throws JMQException {
MessageContext messageContext = new MessageContext();
messageContext.setMessages(messages);
List messageList = this.getMessages(messageContext);
//打印日志,方便排查問(wèn)題
printLog(messageContext);
super.send(messageList, timeout);
}
/**
* 單個(gè)發(fā)送消息
* @param message
* @param transaction
* @param
* @return
* @throws JMQException
*/
public T send(Message message, LocalTransaction transaction) throws JMQException {
MessageContext messageContext = new MessageContext();
messageContext.setMessage(message);
Message msg = this.getMessage(messageContext);
//打印日志,方便排查問(wèn)題
printLog(messageContext);
return super.send(msg, transaction);
}
/**
* 批量發(fā)送消息-有回調(diào)
* @param messages
* @param timeout
* @param callback
* @throws JMQException
*/
public void send(List messages, int timeout, AsyncSendCallback callback) throws JMQException {
MessageContext messageContext = new MessageContext();
messageContext.setMessages(messages);
List messageList = this.getMessages(messageContext);
//打印日志,方便排查問(wèn)題
printLog(messageContext);
super.send(messageList, timeout, callback);
}
/**
* 打印日志,方便排查問(wèn)題
* @param messageContext
*/
private void printLog(MessageContext messageContext) {
if(messageContext==null){
return;
}
if(log.isInfoEnabled()){
log.info("MQ發(fā)送:traceId:{},topic:{},businessIds:[{}]",messageContext.getLogTraceId(),messageContext.getTopic(),messageContext.getBusinessIdBuf()==null?"":messageContext.getBusinessIdBuf().toString());
}
}
}
@Slf4j
public class TraceIdEnvMessageProducer extends BaseTraceIdProducer {
private static final String UAT_TRUE = String.valueOf(true);
private boolean uat = false;
public TraceIdEnvMessageProducer() {
}
public TraceIdEnvMessageProducer(TransportManager transportManager) {
super(transportManager);
}
/**
* 環(huán)境變量打標(biāo)-單個(gè)消息體
* @param message
*/
private void convertUatMessage(Message message) {
if (message != null) {
message.setAttribute(SplitMessage.JMQ_SPLIT_KEY_IS_UAT, UAT_TRUE);
}
}
/**
* 消息轉(zhuǎn)換-批量消息體
* @param messageContext
* @return
*/
private List convertMessages(MessageContext messageContext) {
List messages = messageContext.getMessages();
if (!CollectionUtils.isEmpty(messages)) {
Iterator messageIterator = messages.iterator();
while(messageIterator.hasNext()) {
Message message = (Message)messageIterator.next();
if(this.isUat()){
this.convertUatMessage(message);
}
super.generateTraceIdIntoMessage(message);
super.fillContext(message,messageContext);
}
}
return messageContext.getMessages();
}
/**
* 消息轉(zhuǎn)換-單個(gè)消息體
* @param messageContext
* @return
*/
private Message convertMessage(MessageContext messageContext){
Message message = messageContext.getMessage();
if(this.isUat()){
this.convertUatMessage(message);
}
super.generateTraceIdIntoMessage(message);
super.fillContext(message,messageContext);
return message;
}
protected Message getMessage(MessageContext messageContext) {
if(log.isDebugEnabled()){
log.debug("current environment is UAT : {}", this.isUat());
}
return this.convertMessage(messageContext);
}
protected List getMessages(MessageContext messageContext) {
if(log.isDebugEnabled()){
log.debug("current environment is UAT : {}", this.isUat());
}
return this.convertMessages(messageContext);
}
public void setUat(boolean uat) {
this.uat = uat;
}
boolean isUat() {
return this.uat;
}
}
消費(fèi)者:新建一個(gè)抽象類繼承MessageListener,覆寫(xiě)父類中的onMessage方法,主要進(jìn)行設(shè)置日志traceId和消費(fèi)完成后的traceId清理等,而在子類中進(jìn)行一些自定義處理,具體代碼如下
@Slf4j
public abstract class BaseTraceIdMessageListener implements MessageListener {
public BaseTraceIdMessageListener() {
}
public abstract void onMessageList(List messages) throws Exception;
@Override
public final void onMessage(List messages) throws Exception {
try{
if(CollectionUtils.isEmpty(messages)){
return;
}
//設(shè)置日志traceId
setLogTraceId(messages);
this.onMessageList(messages);
//消費(fèi)完后清除traceId
clear();
}catch (Exception e){
throw e;
}finally {
MDC.clear();
}
}
/**
* 設(shè)置日志traceId
* @param messages
*/
private void setLogTraceId(List messages) {
try{
Message message = messages.get(0);
String logTraceId = message.getAttribute(LogConstants.JMQ2_LOG_TRACE_ID_KEY);
if(StringUtils.isBlank(logTraceId)){
logTraceId = TraceUtils.getTraceId();
}
MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,logTraceId);
}catch (RuntimeException e){
log.error("jmq2自定義log跟蹤攔截器執(zhí)行異常",e);
}
}
/**
* 清除traceId
*/
private void clear() {
try{
MDC.clear();
}catch (RuntimeException e){
log.error("jmq2自定義log跟蹤攔截器執(zhí)行異常",e);
}
}
}
@Slf4j
public abstract class TraceIdEnvMessageListener extends BaseTraceIdMessageListener{
private String uat;
public TraceIdEnvMessageListener() {
}
public abstract void onMessages(List var1) throws Exception;
@Override
public void onMessageList(List messages) throws Exception {
Iterator iterator;
Message message;
if (this.getUat() != null && Boolean.valueOf(this.getUat())) {
iterator = messages.iterator();
while(true) {
while(iterator.hasNext()) {
message = (Message)iterator.next();
if (message != null && Boolean.valueOf(message.getAttribute(SplitMessage.JMQ_SPLIT_KEY_IS_UAT))) {
this.onMessages(Arrays.asList(message));
} else {
log.debug("Ignore message: [BusinessId: {}, Text: {}]", message.getBusinessId(), message.getText());
}
}
return;
}
} else if (this.getUat() != null && !Boolean.valueOf(this.getUat())) {
iterator = messages.iterator();
while(true) {
while(iterator.hasNext()) {
message = (Message)iterator.next();
if (message != null && !Boolean.valueOf(message.getAttribute(SplitMessage.JMQ_SPLIT_KEY_IS_UAT))) {
this.onMessages(Arrays.asList(message));
} else {
log.debug("Ignore message: [BusinessId: {}, Text: {}]", message.getBusinessId(), message.getText());
}
}
return;
}
} else {
this.onMessages(messages);
}
}
public void setUat(String uat) {
if (!"true".equals(uat) && !"false".equals(uat)) {
throw new IllegalArgumentException("uat 屬性值只能為 true 或 false.");
} else {
this.uat = uat;
}
}
public String getUat() {
return this.uat;
}
}
resteasy場(chǎng)景
此場(chǎng)景類似于spinrg-mvc場(chǎng)景,也是http請(qǐng)求,需要通過(guò)攔截器在消息頭中進(jìn)行l(wèi)ogTraceId的透?jìng)?,主要有客戶端攔截器,服務(wù)端:預(yù)處理攔截器、后置攔截器,代碼如下
@ClientInterceptor
@Provider
@Slf4j
public class ResteasyClientInterceptor implements ClientExecutionInterceptor {
@Override
public ClientResponse execute(ClientExecutionContext clientExecutionContext) throws Exception {
try{
String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
ClientRequest request = clientExecutionContext.getRequest();
String headerTraceId = request.getHeaders().getFirst(LogConstants.HEADER_LOG_TRACE_ID_KEY);
if(StringUtils.isBlank(logTraceId) && StringUtils.isBlank(headerTraceId)){
//如果filter和MDC都沒(méi)有獲取到則說(shuō)明是調(diào)用源頭
String traceId = TraceUtils.getTraceId();
TraceUtils.resetTraceId(traceId);
request.header(LogConstants.HEADER_LOG_TRACE_ID_KEY,traceId);
} else if(StringUtils.isBlank(headerTraceId)){
//如果MDC有但是filter沒(méi)有則需要傳遞
request.header(LogConstants.HEADER_LOG_TRACE_ID_KEY,logTraceId);
}
}catch (RuntimeException e){
log.error("resteasy客戶端log跟蹤攔截器執(zhí)行異常",e);
}
return clientExecutionContext.proceed();
}
}
@Slf4j
@Provider
@ServerInterceptor
public class RestEasyPreInterceptor implements PreProcessInterceptor {
@Override
public ServerResponse preProcess(HttpRequest request, ResourceMethod resourceMethod) throws Failure, WebApplicationException {
try{
MultivaluedMap requestHeaders = request.getHttpHeaders().getRequestHeaders();
String headerTraceId = requestHeaders.getFirst(LogConstants.HEADER_LOG_TRACE_ID_KEY);
if(StringUtils.isNotBlank(headerTraceId)){
//如果filter則透?jìng)? TraceUtils.resetTraceId(headerTraceId);
}
}catch (RuntimeException e){
log.error("resteasy服務(wù)端log跟蹤前置攔截器執(zhí)行異常",e);
}
return null;
}
}
@Slf4j
@Provider
@ServerInterceptor
public class ResteasyPostInterceptor implements PostProcessInterceptor {
@Override
public void postProcess(ServerResponse serverResponse) {
try{
MDC.clear();
}catch (RuntimeException e){
log.error("resteasy服務(wù)端log跟蹤后置攔截器執(zhí)行異常",e);
}
}
}
clover場(chǎng)景
clover的大體機(jī)制主要是在項(xiàng)目啟動(dòng)的時(shí)候掃描到帶有注解@HessianWebService的類進(jìn)行服務(wù)注冊(cè)并維持心跳檢測(cè),而clover端則通過(guò)servlet請(qǐng)求方式進(jìn)行任務(wù)的回調(diào),同時(shí)繼承AbstractScheduleTaskProcess方式的任務(wù)是以線程池的方式進(jìn)行業(yè)務(wù)的處理
基于上述原理我們需要解決兩個(gè)問(wèn)題:1.新建一個(gè)類繼承ServiceExporterServlet,并在web.xml配置中進(jìn)行servlet配置,代碼如下;
@Slf4j
public class ServiceExporterTraceIdServlet extends ServiceExporterServlet {
@Override
public void service(ServletRequest req, ServletResponse res) throws ServletException, IOException {
try {
String traceId = MDC.get("traceId");
if (StringUtils.isBlank(traceId)) {
MDC.put("traceId", TraceUtils.getTraceId());
}
} catch (Exception e) {
log.error("clover請(qǐng)求servlet執(zhí)行異常", e);
}
try {
super.service(req, res);
} catch (Throwable e) {
log.error("clover請(qǐng)求servlet執(zhí)行異常", e);
throw e;
}finally {
try{
MDC.clear();
}catch (RuntimeException ex){
log.error("clover請(qǐng)求servlet執(zhí)行異常",ex);
}
}
}
}
2.新建一個(gè)抽象類繼承AbstractScheduleTaskProcess,在類中以編碼形式進(jìn)行父子線程的透?jìng)鳎蓛?yōu)化:通過(guò)覆寫(xiě)MDCAdapter:通過(guò)阿里的TransmittableThreadLocal來(lái)解決父子線程傳遞問(wèn)題),所有任務(wù)均改為繼承此類,關(guān)鍵代碼如下
try{
traceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
if (StringUtils.isBlank(traceId)) {
log.warn("clover自定義log跟蹤攔截器預(yù)警,mdc沒(méi)有traceId");
}
}catch (RuntimeException e){
log.error("clover自定義log跟蹤攔截器執(zhí)行異常",e);
}
final String logTraceId = traceId;
while(iterator.hasNext()) {
final List list = (List)iterator.next();
this.executor.submit(new Callable() {
public Object call() throws Exception {
try{
if (StringUtils.isNotBlank(logTraceId)) {
MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, logTraceId);
}
}catch (RuntimeException e){
log.error("clover自定義log跟蹤攔截器執(zhí)行異常",e);
}
Object var1;
try {
if (BaseTcTaskProcessWorker.logger.isInfoEnabled()) {
BaseTcTaskProcessWorker.logger.info("正在執(zhí)行任務(wù)[" + this.getClass().getName() + "],條數(shù):" + list.size() + "...");
}
BaseTcTaskProcessWorker.this.executeTasks(list);
if (BaseTcTaskProcessWorker.logger.isInfoEnabled()) {
BaseTcTaskProcessWorker.logger.info("執(zhí)行任務(wù)[" + this.getClass().getName() + "],條數(shù):" + list.size() + "成功!");
}
var1 = null;
} catch (Exception var5) {
BaseTcTaskProcessWorker.logger.error(var5.getMessage(), var5);
throw var5;
} finally {
try{
MDC.clear();
}catch (RuntimeException ex){
log.error("clover自定義log跟蹤攔截器執(zhí)行異常",ex);
}
latch.countDown();
}
return var1;
}
});
}
easyjob場(chǎng)景
easyjob的大體機(jī)制是在項(xiàng)目啟動(dòng)的時(shí)候通過(guò)掃描實(shí)現(xiàn)接口Scheduler的類進(jìn)行上報(bào)注冊(cè),同時(shí)啟動(dòng)一個(gè)acceptor(獲取任務(wù)的線程池),而acceptor拉取到任務(wù)后會(huì)將父任務(wù)放進(jìn)一個(gè)叫executor的線程池,子任務(wù)范進(jìn)一個(gè)叫slowExecutor的線程池,我們可以新建一個(gè)抽獎(jiǎng)?lì)悓?shí)現(xiàn)接口ScheduleFlowTask,復(fù)用clover場(chǎng)景硬編碼方式進(jìn)行父子線程logTraceId的透?jìng)魈幚恚蓛?yōu)化:通過(guò)覆寫(xiě)MDCAdapter:通過(guò)阿里的TransmittableThreadLocal來(lái)解決父子線程傳遞問(wèn)題),示例代碼如下
?
@Slf4j
public abstract class AbstractEasyjobOnlyScheduleProcess implements ScheduleFlowTask {
/**
* EASYJOB平臺(tái)UMP監(jiān)控key前綴
*/
private static final String EASYJOB_UMP_KEY_RREFIX = "trans.easyjob.dotask.";
/**
* EASYJOB單個(gè)任務(wù)處理分布式鎖前綴
*/
private static final String EASYJOB_SINGLE_TASK_LOCK_PREFIX = "basic_easyjob_single_task_lock_prefix_";
/**
* 環(huán)境標(biāo)識(shí)-開(kāi)關(guān)配置進(jìn)行環(huán)境隔離
*/
@Value("${spring.profiles.active}")
private String activeEnv;
@Value("${task.scene.mark}")
private String sceneMark = TaskSceneMarkEnum.PRODUCTION.getDesc();
/**
* easyJob維度線程池變量
*/
private ThreadPoolExecutor easyJobExecutor;
/**
* easyJob維度服務(wù)器個(gè)數(shù)-分片個(gè)數(shù)
*/
private volatile int easyJobLastThreadCount = 0;
/**
* easyjob多線程名稱
*/
private static final String EASYJOB_THREAD_NAME = "dts.easyJobs";
/**
* 子類的泛型參數(shù)類型
*/
private Class argumentType;
/**
* 無(wú)參構(gòu)造
*/
public AbstractEasyjobOnlyScheduleProcess() {
//設(shè)置子類泛型參數(shù)類型
argumentType = this.getArgumentType();
}
@Autowired
private RedisHelper redisHelper;
/**
* 非task表掃描待處理的任務(wù)數(shù)據(jù)
* @param taskServerParam
* @param curServer
* @return
*/
protected abstract List loadTasks(TaskServerParam taskServerParam, int curServer);
/**
* 業(yè)務(wù)處理抽象方法-單個(gè)
* @param task
*/
protected abstract void doSingleTask(T task);
/**
* 業(yè)務(wù)處理抽象方法-批量
* @param tasks
*/
protected abstract void doBatchTasks(List tasks);
/**
* 拼裝ump監(jiān)控key
* @param prefix
* @param taskNameKey
* @return
*/
private String getUmpKey(String prefix,String taskNameKey) {
StringBuffer umpKeyBuf = new StringBuffer();
umpKeyBuf.append(prefix).append(taskNameKey);
return umpKeyBuf.toString();
}
/**
* easyjob平臺(tái)異步任務(wù)回調(diào)方法
* @param scheduleContext
* @return
* @throws Exception
*/
@Override
public TaskResult doTask(ScheduleContext scheduleContext) throws Exception {
String requestNo = TraceUtils.getTraceId();
try {
String traceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY);
if (StringUtils.isBlank(traceId)) {
MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, requestNo);
}
} catch (Exception e) {
log.error("easyjob執(zhí)行異常", e);
}
EasyJobTaskServerParam taskServerParam = null;
CallerInfo callerinfo = null;
try {
//條件轉(zhuǎn)換
taskServerParam = EasyJobCoreUtil.transTaskServerParam(scheduleContext);
String taskNameKey = getTaskNameKey();
String umpKey = getUmpKey(EASYJOB_UMP_KEY_RREFIX,taskNameKey);
callerinfo = Profiler.registerInfo(umpKey, Constants.TRANS_BASIC, false, true);
//多服務(wù)器,并且非子任務(wù),本次不執(zhí)行,提交子任務(wù)
if (taskServerParam.getServerCount() > 1 && !taskServerParam.isSubTask()) {
submitSubTask(scheduleContext, taskServerParam,requestNo);
return TaskResult.success();
}
if (log.isInfoEnabled()) {
log.info("請(qǐng)求編號(hào)[{}],開(kāi)始獲取任務(wù),任務(wù)ID[{}],任務(wù)名稱[{}],執(zhí)行參數(shù)[{}]", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), JSON.toJSONString(taskServerParam));
}
TaskServerParam cloverTaskServerParam = EasyJobCoreUtil.transferCloverTaskServerParam(taskServerParam);
List tasks = this.selectTasks(cloverTaskServerParam, taskServerParam.getCurServer());
if (log.isInfoEnabled()) {
log.info("請(qǐng)求編號(hào)[{}],獲取任務(wù)ID[{}],任務(wù)名稱[{}]共{}條", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), tasks == null ? 0 : tasks.size());
}
if (CollectionUtils.isNotEmpty(tasks)) {
if (log.isInfoEnabled()) {
log.info("請(qǐng)求編號(hào)[{}],開(kāi)始執(zhí)行任務(wù),任務(wù)ID[{}],任務(wù)名稱[{}]", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName());
}
this.easyJobExecuteTasksInner(taskServerParam, tasks,requestNo);
if (log.isInfoEnabled()) {
log.info("請(qǐng)求編號(hào)[{}],執(zhí)行任務(wù),任務(wù)ID[{}],任務(wù)名稱[{}],執(zhí)行數(shù)量[{}]完成....", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), tasks.size());
}
}
return TaskResult.success();
} catch (Exception e) {
Profiler.functionError(callerinfo);
if (log.isInfoEnabled()) {
log.error("請(qǐng)求編號(hào)[{}],任務(wù)執(zhí)行失敗,任務(wù)ID[{}],任務(wù)名稱[{}]", requestNo, taskServerParam == null ? "" : taskServerParam.getTaskId(), taskServerParam == null ? "" :taskServerParam.getTaskName(), e);
}
return TaskResult.fail(e.getMessage());
}finally {
try{
MDC.clear();
}catch (RuntimeException ex){
log.error("easyjob執(zhí)行異常",ex);
}
Profiler.registerInfoEnd(callerinfo);
}
}
/**
* 多分片提交子任務(wù)
* @param scheduleContext 調(diào)度任務(wù)上下文參數(shù)
* @param taskServerParam 調(diào)度任務(wù)參數(shù)
* @param requestNo 調(diào)度任務(wù)參數(shù)
* @return void
*/
private void submitSubTask(ScheduleContext scheduleContext, EasyJobTaskServerParam taskServerParam,String requestNo) throws IOException {
log.info("請(qǐng)求編號(hào)[{}],執(zhí)行任務(wù),任務(wù)ID[{}],任務(wù)名稱[{}],子任務(wù)個(gè)數(shù)[{}],開(kāi)始提交子任務(wù)", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), taskServerParam.getServerCount());
String jobClass = scheduleContext.getTaskGetResponse().getJobClass();
if (StringUtils.isBlank(jobClass)) {
throw new RuntimeException("jobClass get error");
}
for (int i = 0; i < taskServerParam.getServerCount(); i++) {
Map dataMap = scheduleContext.getParameters();
//提交子任務(wù)標(biāo)識(shí)
dataMap.put("isSubTask", "true");
//給子任務(wù)進(jìn)行編號(hào)
dataMap.put("curServer", String.valueOf(i));
//父任務(wù)名稱傳遞子任務(wù)
dataMap.put("taskName", taskServerParam.getTaskName());
scheduleContext.commitSubTask(jobClass, dataMap, taskServerParam.getExpected(), taskServerParam.getTransactionalAccept());
}
// 父任務(wù)等待子任務(wù)執(zhí)行完畢再更改狀態(tài),如果執(zhí)行時(shí)間超過(guò)等待時(shí)間,拋異常
//scheduleContext.waitForSubtaskCompleted((long) taskServerParam.getServerCount() * taskServerParam.getExpected());
log.info("請(qǐng)求編號(hào)[{}],執(zhí)行任務(wù),任務(wù)ID[{}],任務(wù)名稱[{}],子任務(wù)個(gè)數(shù)[{}],提交完成....", requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), taskServerParam.getServerCount());
}
/**
* 創(chuàng)建線程池,按配置參數(shù)執(zhí)行task
* @param param 執(zhí)行參數(shù)
* @param tasks 任務(wù)集合
* @param requestNoStr
* @return void
*/
private void easyJobExecuteTasksInner(final EasyJobTaskServerParam param, List tasks,String requestNoStr) {
int threadCount = param.getThreadCount();
synchronized (this) {
if (this.easyJobExecutor == null) {
this.easyJobExecutor = (ThreadPoolExecutor) EasyJobCoreUtil.createCustomeasyJobExecutorService(threadCount, EASYJOB_THREAD_NAME);
this.easyJobLastThreadCount = threadCount;
} else if (threadCount > this.easyJobLastThreadCount) {
this.easyJobExecutor.setMaximumPoolSize(threadCount);
this.easyJobExecutor.setCorePoolSize(threadCount);
this.easyJobLastThreadCount = threadCount;
} else if (threadCount < this.easyJobLastThreadCount) {
this.easyJobExecutor.setCorePoolSize(threadCount);
this.easyJobExecutor.setMaximumPoolSize(threadCount);
this.easyJobLastThreadCount = threadCount;
}
}
List> lists = Lists.partition(tasks, param.getExecuteCount());
final CountDownLatch latch = new CountDownLatch(lists.size());
final String requestNo = requestNoStr;
for (final List list : lists) {
this.easyJobExecutor.submit(
new Callable() {
public Object call() throws Exception {
try{
if (StringUtils.isNotBlank(requestNo)) {
MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, requestNo);
}
}catch (RuntimeException e){
log.error("easyjob自定義log跟蹤攔截器執(zhí)行異常",e);
}
try {
if (log.isInfoEnabled()) {
log.info("請(qǐng)求編號(hào)[{}],正在執(zhí)行任務(wù),任務(wù)ID[{}],任務(wù)名稱[{}],[{}],條數(shù):[{}]...", requestNo, param.getTaskId(), param.getTaskName(), Thread.currentThread().getName(), list.size());
}
executeTasks(list);
if (log.isInfoEnabled()) {
log.info("請(qǐng)求編號(hào)[{}],執(zhí)行任務(wù),任務(wù)ID[{}],任務(wù)名稱[{}],[{}],條數(shù):[{}]成功!", requestNo, param.getTaskId(), param.getTaskName(), Thread.currentThread().getName(), list.size());
}
} catch (Exception e) {
log.error(e.getMessage(), e);
throw e;
} finally {
try{
MDC.clear();
}catch (RuntimeException ex){
log.error("easyjob自定義log跟蹤攔截器執(zhí)行異常",ex);
}
latch.countDown();
}
return null;
}
}
);
}
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException("interrupted when processing data access request in concurrency", e);
}
}
/**
* 獲取任務(wù)名稱
* @return
*/
private String getTaskNameKey(){
StringBuffer keyBuf = new StringBuffer();
keyBuf.append(activeEnv)
.append(Constants.SEPARATOR_UNDERLINE)
.append(this.getClass().getSimpleName());
return keyBuf.toString();
}
protected void executeTasks(List taskList) {
if(CollectionUtils.isEmpty(taskList)) {
return;
}
this.doTasks(taskList);
}
/**
* 業(yè)務(wù)處理抽象方法
* @param list
*/
protected void doTasks(List list){
if(isDoBatchTasks()){
CallerInfo info = Profiler.registerInfo(getClass().getName()+"_batch", Constants.TRANS_BASIC,false, true);
try {
/** 開(kāi)始執(zhí)行各個(gè)子類真正業(yè)務(wù)邏輯 */
this.doBatchTasks(list);
} catch(CommonBusinessException ex){
log.warn(ex.getMessage());
} catch (Exception e) {
Profiler.functionError(info);
log.error("任務(wù)處理失敗,方法:{},任務(wù):{}",ClassHelper.getMethod(),JSON.toJSONString(list), e);
} finally {
Profiler.registerInfoEnd(info);
}
}else{
for (T task : list) {
CallerInfo info = Profiler.registerInfo(getClass().getName(), Constants.TRANS_BASIC,false, true);
if(task == null) { continue; }
String lockKey = "";
try {
/** 開(kāi)始執(zhí)行各個(gè)子類真正業(yè)務(wù)邏輯 */
if (useConcurrentLock()) {
lockKey = getLockKey(task);
if (redisHelper.lock(RedisKeyDef.SyncLockKeyPrefix.TASK_PROCESS_LOCK_PREFIX, lockKey)) {
this.doSingleTask(task);
}else{
lockKey = "";
log.warn("lockKey:{},加載失敗,正在被其他用戶鎖定,請(qǐng)重試!",lockKey);
}
} else {
this.doSingleTask(task);
}
} catch(CommonBusinessException ex){
log.warn(ex.getMessage());
} catch (Exception e) {
Profiler.functionError(info);
log.error("任務(wù)處理失敗,方法:{},任務(wù):{}",ClassHelper.getMethod(),JSON.toJSONString(task), e);
} finally {
Profiler.registerInfoEnd(info);
if (StringUtils.isNotBlank(lockKey)) {
redisHelper.unlock(RedisKeyDef.SyncLockKeyPrefix.TASK_PROCESS_LOCK_PREFIX, lockKey);
}
}
}
}
}
/**
* 獲取實(shí)體類的實(shí)際類型
*
* @return
*/
private Class getArgumentType() {
return (Class) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
}
/**
* 是否使用防并發(fā)鎖
* 默認(rèn)不使用,如需使用子類重寫(xiě)該方法
* @return
*/
protected boolean useConcurrentLock() {
return false;
}
/**
* 根所注解獲取LockKey,可被子類重寫(xiě),提高效率
*
* @param businessObj 業(yè)務(wù)對(duì)象
* @return concurrent lock key
*/
protected String getLockKey( T businessObj) {
StringBuilder lockKey = new StringBuilder(EASYJOB_SINGLE_TASK_LOCK_PREFIX);
//若存在注解指定的防重字段,則使用這些字段拼裝防重Key,否則使用MQ業(yè)務(wù)主鍵防重
List valueEntries = getAnnotaionConcurrentKeys(businessObj);
if (!CollectionUtils.isEmpty(valueEntries)) {
for (ValueEntryInfo valueEntry : valueEntries) {
lockKey.append(Constants.SEPARATOR_UNDERLINE);
lockKey.append(valueEntry.getValue());
}
} else {
throw new CommonBusinessException(String.format("此任務(wù)處理需要加分布式鎖,但是未設(shè)置鎖key,所以不做業(yè)務(wù)處理,請(qǐng)檢查,任務(wù)信息:%s",JSON.toJSONString(businessObj)));
}
return lockKey.toString();
}
/**
* 查找對(duì)象的ConccurentKey注解,獲取防重字段,并排序返回
*
* @param businessObj 業(yè)務(wù)對(duì)象
* @return 有序的業(yè)務(wù)字段值列表
*/
private List getAnnotaionConcurrentKeys(T businessObj) {
List valueEntries = new ArrayList();
Field[] fields = businessObj.getClass().getDeclaredFields();
for (int i = 0; i < fields.length; i++) {
ConcurrentKey concurrentKey = fields[i].getAnnotation(ConcurrentKey.class);
if (concurrentKey != null) {
fields[i].setAccessible(true);
Object fieldVal = null;
try {
ValueEntryInfo valueEntry = new ValueEntryInfo();
fieldVal = fields[i].get(businessObj);
if (fieldVal != null) {
valueEntry.setValue(String.format("%1$s", fieldVal));
valueEntry.setOrder(concurrentKey.order());
valueEntries.add(valueEntry);
}
} catch (IllegalAccessException e) {
log.error("IllegalAccess-{}.{}", businessObj.getClass().getName(), fields[i].getName());
}
}
}
if (valueEntries.size() > 1) {
//排序ConcurrentKey
Collections.sort(valueEntries, new Comparator() {
@Override
public int compare(ValueEntryInfo o1, ValueEntryInfo o2) {
if (o1.getOrder() > o2.getOrder()) {
return 1;
} else if (o1.getOrder() == o2.getOrder()) {
return 0;
} else {
return -1;
}
}
});
}
return valueEntries;
}
protected List selectTasks(TaskServerParam taskServerParam, int curServer) {
return this.loadTasks(taskServerParam, curServer);
}
/**
* 獲取select時(shí)的任務(wù)創(chuàng)建開(kāi)始時(shí)間
* @param serverArg
* @return
*/
protected Date getCreateTimeFrom(String serverArg){
return null;
}
/**
* 是否以批量方式處理任務(wù)
* @return
*/
protected boolean isDoBatchTasks(){
return false;
}
}
實(shí)戰(zhàn)結(jié)果
上述所述均為透?jìng)鱅D場(chǎng)景的原理和示例代碼,實(shí)戰(zhàn)效果如下圖:調(diào)用jsf超時(shí),跨系統(tǒng)查看日志進(jìn)行排查,得知為慢sql引起
上述大部分場(chǎng)景已經(jīng)抽出一個(gè)通用jar包,詳細(xì)使用教程見(jiàn)我的另一篇文章:分布式日志追蹤ID使用教程
審核編輯 黃宇
-
分布式
+關(guān)注
關(guān)注
1文章
919瀏覽量
74572 -
JSF
+關(guān)注
關(guān)注
0文章
12瀏覽量
7762 -
過(guò)濾器
+關(guān)注
關(guān)注
1文章
432瀏覽量
19679
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論