¿Cómo usar MDC con grupos de subprocesos?

146

En nuestro software, utilizamos ampliamente MDC para rastrear cosas como ID de sesión y nombres de usuario para solicitudes web. Esto funciona bien mientras se ejecuta en el hilo original. Sin embargo, hay muchas cosas que deben procesarse en segundo plano. Para eso utilizamos las clases java.concurrent.ThreadPoolExecutory java.util.Timerjunto con algunos servicios de ejecución asíncrona auto-enrollados. Todos estos servicios administran su propio grupo de subprocesos.

Esto es lo que dice el manual de Logback sobre el uso de MDC en dicho entorno:

Los subprocesos de trabajo no siempre pueden heredar una copia del contexto de diagnóstico asignado del subproceso inicial. Este es el caso cuando se usa java.util.concurrent.Executors para la gestión de subprocesos. Por ejemplo, el método newCachedThreadPool crea un ThreadPoolExecutor y, al igual que otros códigos de agrupación de subprocesos, tiene una intrincada lógica de creación de subprocesos.

En tales casos, se recomienda invocar MDC.getCopyOfContextMap () en el subproceso original (maestro) antes de enviar una tarea al ejecutor. Cuando se ejecuta la tarea, como su primera acción, debe invocar MDC.setContextMapValues ​​() para asociar la copia almacenada de los valores originales de MDC con el nuevo subproceso administrado por el Ejecutor.

Esto estaría bien, pero es muy fácil olvidar agregar esas llamadas, y no hay una manera fácil de reconocer el problema hasta que sea demasiado tarde. El único signo con Log4j es que te falta información de MDC en los registros, y con Logback obtienes información de MDC obsoleta (ya que el subproceso en la banda de rodadura hereda su MDC de la primera tarea que se ejecutó). Ambos son problemas serios en un sistema de producción.

No veo nuestra situación especial de ninguna manera, sin embargo, no pude encontrar mucho sobre este problema en la web. Aparentemente, esto no es algo con lo que muchas personas se topan, por lo que debe haber una forma de evitarlo. ¿Qué estamos haciendo mal aquí?

Lóránt Pintér
fuente
1
Si su aplicación se implementa en un entorno JEE, puede usar interceptores java para configurar el contexto MDC antes de la invocación de EJB.
Maxim Kirilov
2
A partir de la versión 1.1.5 de logback, los subprocesos secundarios ya no heredan los valores de MDC.
Ceki
jira.qos.ch/browse/LOGBACK-422 resuelto
lyjackal
2
@Ceki La documentación debe actualizarse: "Un subproceso secundario hereda automáticamente una copia del contexto de diagnóstico asignado de su elemento primario". logback.qos.ch/manual/mdc.html
steffen
Creé una solicitud de extracción para slf4j que resuelve el problema del uso de MDC a través de subprocesos (enlace github.com/qos-ch/slf4j/pull/150 ). Puede ser, si la gente comenta y lo solicita, incorporarán el cambio en el SLF4J :)
Hombre

Respuestas:

79

Sí, este es un problema común con el que también me he encontrado. Hay algunas soluciones alternativas (como configurarlo manualmente, como se describe), pero idealmente desea una solución que

  • Establece el MDC de manera consistente;
  • Evita errores tácitos donde el MDC es incorrecto pero usted no lo sabe; y
  • Minimiza los cambios en la forma en que usa grupos de subprocesos (por ejemplo, subclases Callablecon MyCallablecualquier lugar o fealdad similar)

Aquí hay una solución que uso que satisface estas tres necesidades. El código debe explicarse por sí mismo.

(Como nota al margen, este ejecutor se puede crear y alimentar a Guava's MoreExecutors.listeningDecorator(), si usa Guava's ListanableFuture).

import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;

/**
 * A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
 * <p/>
 * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
 * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
 * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
 * <p/>
 * Created by jlevy.
 * Date: 6/14/13
 */
public class MdcThreadPoolExecutor extends ThreadPoolExecutor {

    final private boolean useFixedContext;
    final private Map<String, Object> fixedContext;

    /**
     * Pool where task threads take MDC from the submitting thread.
     */
    public static MdcThreadPoolExecutor newWithInheritedMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                            TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(null, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    /**
     * Pool where task threads take fixed MDC from the thread that creates the pool.
     */
    @SuppressWarnings("unchecked")
    public static MdcThreadPoolExecutor newWithCurrentMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                          TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(MDC.getCopyOfContextMap(), corePoolSize, maximumPoolSize, keepAliveTime, unit,
                workQueue);
    }

    /**
     * Pool where task threads always have a specified, fixed MDC.
     */
    public static MdcThreadPoolExecutor newWithFixedMdc(Map<String, Object> fixedContext, int corePoolSize,
                                                        int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                                        BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(fixedContext, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    private MdcThreadPoolExecutor(Map<String, Object> fixedContext, int corePoolSize, int maximumPoolSize,
                                  long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.fixedContext = fixedContext;
        useFixedContext = (fixedContext != null);
    }

    @SuppressWarnings("unchecked")
    private Map<String, Object> getContextForTask() {
        return useFixedContext ? fixedContext : MDC.getCopyOfContextMap();
    }

    /**
     * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
     * all delegate to this.
     */
    @Override
    public void execute(Runnable command) {
        super.execute(wrap(command, getContextForTask()));
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
        return new Runnable() {
            @Override
            public void run() {
                Map previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    runnable.run();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            }
        };
    }
}
jlevy
fuente
En caso de que el contexto anterior no esté vacío, ¿no es siempre basura? ¿Por qué lo llevas?
djjeck
2
Correcto; No debe establecerse. Parece una buena higiene, por ejemplo, si el método wrap () fue expuesto y utilizado por otra persona en el camino.
jlevy
¿Puede proporcionar una referencia de cómo Log4J2 adjuntó o hizo referencia a este MdcThreadPoolExecutor? ¿Hay algún lugar donde necesitemos hacer referencia específica a esta clase, o se hace "automáticamente"? No estoy usando guayaba. Podría, pero me gustaría saber si hay alguna otra forma antes de usarlo.
jcb
Si entiendo su pregunta correctamente, la respuesta es sí, son variables locales de hilo "mágicas" en SLF4J - vea las implementaciones de MDC.setContextMap () etc. También, por cierto, esto usa SLF4J, no Log4J, lo cual es preferible ya que funciona con Log4j, Logback y otras configuraciones de registro.
jlevy
1
Solo para completar: si está usando Spring's en ThreadPoolTaskExecutorlugar de Java simple ThreadPoolExecutor, puede usar lo MdcTaskDecoratordescrito en moelholm.com/2017/07/24/…
Pino
27

Nos hemos encontrado con un problema similar. Es posible que desee extender ThreadPoolExecutor y anular los métodos before / afterExecute para realizar las llamadas MDC que necesita antes de comenzar / detener nuevos hilos.

marca
fuente
10
Los métodos beforeExecute(Thread, Runnable)y afterExecute(Runnable, Throwable)pueden ser útiles en otros casos, pero no estoy seguro de cómo funcionará para establecer MDC. Ambos se ejecutan bajo el hilo generado. Esto significa que debe poder obtener el mapa actualizado desde el hilo principal antes beforeExecute.
Kenston Choi
Es mejor establecer MDC en el filtro, eso significa que cuando la solicitud está siendo procesada por la lógica empresarial, el contexto no se actualizará. No creo que debamos actualizar MDC en todas partes de la aplicación
Desbloquee el
15

En mi humilde opinión la mejor solución es:

  • utilizar ThreadPoolTaskExecutor
  • implementa tu propio TaskDecorator
  • úsalo: executor.setTaskDecorator(new LoggingTaskDecorator());

El decorador puede verse así:

private final class LoggingTaskDecorator implements TaskDecorator {

    @Override
    public Runnable decorate(Runnable task) {
        // web thread
        Map<String, String> webThreadContext = MDC.getCopyOfContextMap();
        return () -> {
            // work thread
            try {
                // TODO: is this thread safe?
                MDC.setContextMap(webThreadContext);
                task.run();
            } finally {
                MDC.clear();
            }
        };
    }

}
Tomáš Myšík
fuente
Lo siento, no estoy seguro de lo que quieres decir. ACTUALIZACIÓN: Creo que veo ahora, mejorará mi respuesta.
Tomáš Myšík
6

Así es como lo hago con grupos de subprocesos fijos y ejecutores:

ExecutorService executor = Executors.newFixedThreadPool(4);
Map<String, String> mdcContextMap = MDC.getCopyOfContextMap();

En la parte de roscado:

executor.submit(() -> {
    MDC.setContextMap(mdcContextMap);
    // my stuff
});
Amaury D
fuente
2

Al igual que las soluciones publicadas anteriormente, los newTaskFormétodos para Runnabley Callablese pueden sobrescribir para ajustar el argumento (ver solución aceptada) al crear el RunnableFuture.

Nota: Por consiguiente, se debe llamar al método executorService's en submitlugar del executemétodo.

Para el ScheduledThreadPoolExecutor, los decorateTaskmétodos se sobrescribirán en su lugar.

Mi llave_
fuente
0

Otra variación similar a las respuestas existentes aquí es implementar ExecutorServicey permitir que se le pase un delegado. Luego, utilizando genéricos, aún puede exponer al delegado real en caso de que uno quiera obtener algunas estadísticas (siempre y cuando no se utilicen otros métodos de modificación).

Código de referencia:

public class MDCExecutorService<D extends ExecutorService> implements ExecutorService {

    private final D delegate;

    public MDCExecutorService(D delegate) {
        this.delegate = delegate;
    }

    @Override
    public void shutdown() {
        delegate.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        return delegate.shutdownNow();
    }

    @Override
    public boolean isShutdown() {
        return delegate.isShutdown();
    }

    @Override
    public boolean isTerminated() {
        return delegate.isTerminated();
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        return delegate.awaitTermination(timeout, unit);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return delegate.submit(wrap(task));
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return delegate.submit(wrap(task), result);
    }

    @Override
    public Future<?> submit(Runnable task) {
        return delegate.submit(wrap(task));
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
        return delegate.invokeAll(wrapCollection(tasks));
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
        return delegate.invokeAll(wrapCollection(tasks), timeout, unit);
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        return delegate.invokeAny(wrapCollection(tasks));
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return delegate.invokeAny(wrapCollection(tasks), timeout, unit);
    }

    @Override
    public void execute(Runnable command) {
        delegate.execute(wrap(command));
    }

    public D getDelegate() {
        return delegate;
    }

    /* Copied from https://github.com/project-ncl/pnc/blob/master/common/src/main/java/org/jboss/pnc/common
    /concurrent/MDCWrappers.java */

    private static Runnable wrap(final Runnable runnable) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return () -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                runnable.run();
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Callable<T> wrap(final Callable<T> callable) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return () -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                return callable.call();
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Consumer<T> wrap(final Consumer<T> consumer) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return (t) -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                consumer.accept(t);
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Collection<Callable<T>> wrapCollection(Collection<? extends Callable<T>> tasks) {
        Collection<Callable<T>> wrapped = new ArrayList<>();
        for (Callable<T> task : tasks) {
            wrapped.add(wrap(task));
        }
        return wrapped;
    }
}
Kenston Choi
fuente
-3

Pude resolver esto usando el siguiente enfoque

En el hilo principal (Application.java, el punto de entrada de mi aplicación)

static public Map<String, String> mdcContextMap = MDC.getCopyOfContextMap();

En el método de ejecución de la clase que Executer llama

MDC.setContextMap(Application.mdcContextMap);
smishra
fuente