Estoy escribiendo una aplicación que tiene un trabajo cron que se ejecuta cada 60 segundos. La aplicación está configurada para escalar cuando sea necesario en varias instancias. Solo quiero ejecutar la tarea en 1 instancia cada 60 segundos (en cualquier nodo). Fuera de la caja, no puedo encontrar una solución a esto y me sorprende que no me hayan preguntado varias veces antes. Estoy usando Spring 4.1.6.
<task:scheduled-tasks>
<task:scheduled ref="beanName" method="execute" cron="0/60 * * * * *"/>
</task:scheduled-tasks>
spring
spring-scheduled
usuario3131879
fuente
fuente
CronJob
enkubernetes
?Respuestas:
Hay un proyecto ShedLock que sirve exactamente para este propósito. Solo anota tareas que deberían bloquearse cuando se ejecutan
@Scheduled( ... ) @SchedulerLock(name = "scheduledTaskName") public void scheduledTask() { // do something }
Configurar Spring y un LockProvider
@Configuration @EnableScheduling @EnableSchedulerLock(defaultLockAtMostFor = "10m") class MySpringConfiguration { ... @Bean public LockProvider lockProvider(DataSource dataSource) { return new JdbcTemplateLockProvider(dataSource); } ... }
fuente
Creo que debe usar Quartz Clustering con JDBC-JobStore para este propósito
fuente
Es otra forma sencilla y sólida de ejecutar de forma segura un trabajo en un clúster. Puede basarse en la base de datos y ejecutar la tarea solo si el nodo es el "líder" en el clúster.
Además, cuando un nodo falla o se apaga en el clúster, otro nodo se convierte en el líder.
Todo lo que tiene es crear un mecanismo de "elección de líder" y cada vez que verifique si usted es el líder:
@Scheduled(cron = "*/30 * * * * *") public void executeFailedEmailTasks() { if (checkIfLeader()) { final List<EmailTask> list = emailTaskService.getFailedEmailTasks(); for (EmailTask emailTask : list) { dispatchService.sendEmail(emailTask); } } }
Siga esos pasos:
1.Defina el objeto y la tabla que contiene una entrada por nodo en el clúster:
@Entity(name = "SYS_NODE") public class SystemNode { /** The id. */ @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; /** The name. */ @Column(name = "TIMESTAMP") private String timestamp; /** The ip. */ @Column(name = "IP") private String ip; /** The last ping. */ @Column(name = "LAST_PING") private Date lastPing; /** The last ping. */ @Column(name = "CREATED_AT") private Date createdAt = new Date(); /** The last ping. */ @Column(name = "IS_LEADER") private Boolean isLeader = Boolean.FALSE; public Long getId() { return id; } public void setId(final Long id) { this.id = id; } public String getTimestamp() { return timestamp; } public void setTimestamp(final String timestamp) { this.timestamp = timestamp; } public String getIp() { return ip; } public void setIp(final String ip) { this.ip = ip; } public Date getLastPing() { return lastPing; } public void setLastPing(final Date lastPing) { this.lastPing = lastPing; } public Date getCreatedAt() { return createdAt; } public void setCreatedAt(final Date createdAt) { this.createdAt = createdAt; } public Boolean getIsLeader() { return isLeader; } public void setIsLeader(final Boolean isLeader) { this.isLeader = isLeader; } @Override public String toString() { return "SystemNode{" + "id=" + id + ", timestamp='" + timestamp + '\'' + ", ip='" + ip + '\'' + ", lastPing=" + lastPing + ", createdAt=" + createdAt + ", isLeader=" + isLeader + '}'; }
}
2.Cree el servicio que a) inserte el nodo en la base de datos, b) verifique el líder
@Service @Transactional public class SystemNodeServiceImpl implements SystemNodeService, ApplicationListener { /** The logger. */ private static final Logger LOGGER = Logger.getLogger(SystemNodeService.class); /** The constant NO_ALIVE_NODES. */ private static final String NO_ALIVE_NODES = "Not alive nodes found in list {0}"; /** The ip. */ private String ip; /** The system service. */ private SystemService systemService; /** The system node repository. */ private SystemNodeRepository systemNodeRepository; @Autowired public void setSystemService(final SystemService systemService) { this.systemService = systemService; } @Autowired public void setSystemNodeRepository(final SystemNodeRepository systemNodeRepository) { this.systemNodeRepository = systemNodeRepository; } @Override public void pingNode() { final SystemNode node = systemNodeRepository.findByIp(ip); if (node == null) { createNode(); } else { updateNode(node); } } @Override public void checkLeaderShip() { final List<SystemNode> allList = systemNodeRepository.findAll(); final List<SystemNode> aliveList = filterAliveNodes(allList); SystemNode leader = findLeader(allList); if (leader != null && aliveList.contains(leader)) { setLeaderFlag(allList, Boolean.FALSE); leader.setIsLeader(Boolean.TRUE); systemNodeRepository.save(allList); } else { final SystemNode node = findMinNode(aliveList); setLeaderFlag(allList, Boolean.FALSE); node.setIsLeader(Boolean.TRUE); systemNodeRepository.save(allList); } } /** * Returns the leaded * @param list * the list * @return the leader */ private SystemNode findLeader(final List<SystemNode> list) { for (SystemNode systemNode : list) { if (systemNode.getIsLeader()) { return systemNode; } } return null; } @Override public boolean isLeader() { final SystemNode node = systemNodeRepository.findByIp(ip); return node != null && node.getIsLeader(); } @Override public void onApplicationEvent(final ApplicationEvent applicationEvent) { try { ip = InetAddress.getLocalHost().getHostAddress(); } catch (Exception e) { throw new RuntimeException(e); } if (applicationEvent instanceof ContextRefreshedEvent) { pingNode(); } } /** * Creates the node */ private void createNode() { final SystemNode node = new SystemNode(); node.setIp(ip); node.setTimestamp(String.valueOf(System.currentTimeMillis())); node.setCreatedAt(new Date()); node.setLastPing(new Date()); node.setIsLeader(CollectionUtils.isEmpty(systemNodeRepository.findAll())); systemNodeRepository.save(node); } /** * Updates the node */ private void updateNode(final SystemNode node) { node.setLastPing(new Date()); systemNodeRepository.save(node); } /** * Returns the alive nodes. * * @param list * the list * @return the alive nodes */ private List<SystemNode> filterAliveNodes(final List<SystemNode> list) { int timeout = systemService.getSetting(SettingEnum.SYSTEM_CONFIGURATION_SYSTEM_NODE_ALIVE_TIMEOUT, Integer.class); final List<SystemNode> finalList = new LinkedList<>(); for (SystemNode systemNode : list) { if (!DateUtils.hasExpired(systemNode.getLastPing(), timeout)) { finalList.add(systemNode); } } if (CollectionUtils.isEmpty(finalList)) { LOGGER.warn(MessageFormat.format(NO_ALIVE_NODES, list)); throw new RuntimeException(MessageFormat.format(NO_ALIVE_NODES, list)); } return finalList; } /** * Finds the min name node. * * @param list * the list * @return the min node */ private SystemNode findMinNode(final List<SystemNode> list) { SystemNode min = list.get(0); for (SystemNode systemNode : list) { if (systemNode.getTimestamp().compareTo(min.getTimestamp()) < -1) { min = systemNode; } } return min; } /** * Sets the leader flag. * * @param list * the list * @param value * the value */ private void setLeaderFlag(final List<SystemNode> list, final Boolean value) { for (SystemNode systemNode : list) { systemNode.setIsLeader(value); } }
}
3.pegar la base de datos para enviar que estás vivo
@Override @Scheduled(cron = "0 0/5 * * * ?") public void executeSystemNodePing() { systemNodeService.pingNode(); } @Override @Scheduled(cron = "0 0/10 * * * ?") public void executeLeaderResolution() { systemNodeService.checkLeaderShip(); }
4.¡Estás listo! Solo verifique si es el líder antes de ejecutar la tarea:
@Override @Scheduled(cron = "*/30 * * * * *") public void executeFailedEmailTasks() { if (checkIfLeader()) { final List<EmailTask> list = emailTaskService.getFailedEmailTasks(); for (EmailTask emailTask : list) { dispatchService.sendEmail(emailTask); } } }
fuente
Los trabajos por lotes y programados generalmente se ejecutan en sus propios servidores independientes, lejos de las aplicaciones orientadas al cliente, por lo que no es un requisito común incluir un trabajo en una aplicación que se espera que se ejecute en un clúster. Además, los trabajos en entornos agrupados normalmente no necesitan preocuparse de que otras instancias del mismo trabajo se ejecuten en paralelo, por lo que es otra razón por la que el aislamiento de las instancias de trabajo no es un requisito importante.
Una solución simple sería configurar sus trabajos dentro de un Spring Profile. Por ejemplo, si su configuración actual es:
<beans> <bean id="someBean" .../> <task:scheduled-tasks> <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/> </task:scheduled-tasks> </beans>
cámbielo a:
<beans> <beans profile="scheduled"> <bean id="someBean" .../> <task:scheduled-tasks> <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/> </task:scheduled-tasks> </beans> </beans>
Luego, inicie su aplicación en una sola máquina con el
scheduled
perfil activado (-Dspring.profiles.active=scheduled
).Si el servidor principal no está disponible por algún motivo, simplemente inicie otro servidor con el perfil habilitado y las cosas seguirán funcionando bien.
Las cosas cambian si también desea una conmutación por error automática para los trabajos. Luego, deberá mantener el trabajo en ejecución en todos los servidores y verificar la sincronización a través de un recurso común, como una tabla de base de datos, un caché agrupado, una variable JMX, etc.
fuente
get
y laset
operación para que archieve.dlock está diseñado para ejecutar tareas solo una vez mediante el uso de índices y restricciones de la base de datos. Simplemente puede hacer algo como a continuación.
@Scheduled(cron = "30 30 3 * * *") @TryLock(name = "executeMyTask", owner = SERVER_NAME, lockFor = THREE_MINUTES) public void execute() { }
Consulte el artículo sobre su uso.
fuente
Estoy usando una tabla de base de datos para hacer el bloqueo. Solo una tarea a la vez puede insertar en la tabla. El otro obtendrá una DuplicateKeyException. La lógica de inserción y eliminación es manejada por un aspecto alrededor de la anotación @Scheduled. Estoy usando Spring Boot 2.0
@Component @Aspect public class SchedulerLock { private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerLock.class); @Autowired private JdbcTemplate jdbcTemplate; @Around("execution(@org.springframework.scheduling.annotation.Scheduled * *(..))") public Object lockTask(ProceedingJoinPoint joinPoint) throws Throwable { String jobSignature = joinPoint.getSignature().toString(); try { jdbcTemplate.update("INSERT INTO scheduler_lock (signature, date) VALUES (?, ?)", new Object[] {jobSignature, new Date()}); Object proceed = joinPoint.proceed(); jdbcTemplate.update("DELETE FROM scheduler_lock WHERE lock_signature = ?", new Object[] {jobSignature}); return proceed; }catch (DuplicateKeyException e) { LOGGER.warn("Job is currently locked: "+jobSignature); return null; } } }
@Component public class EveryTenSecondJob { @Scheduled(cron = "0/10 * * * * *") public void taskExecution() { System.out.println("Hello World"); } }
CREATE TABLE scheduler_lock( signature varchar(255) NOT NULL, date datetime DEFAULT NULL, PRIMARY KEY(signature) );
fuente
Puede utilizar un planificador incrustable como db-planificador para lograr esto. Tiene ejecuciones persistentes y utiliza un mecanismo de bloqueo optimista simple para garantizar la ejecución por un solo nodo.
Código de ejemplo sobre cómo se puede lograr el caso de uso:
RecurringTask<Void> recurring1 = Tasks.recurring("my-task-name", FixedDelay.of(Duration.ofSeconds(60))) .execute((taskInstance, executionContext) -> { System.out.println("Executing " + taskInstance.getTaskAndInstance()); }); final Scheduler scheduler = Scheduler .create(dataSource) .startTasks(recurring1) .build(); scheduler.start();
fuente
El contexto de Spring no está agrupado, por lo que administrar la tarea en una aplicación distribuida es un poco difícil y necesita usar sistemas que admitan jgroup para sincronizar el estado y dejar que su tarea tome la prioridad para ejecutar la acción. O puede usar el contexto ejb para administrar el servicio ha singleton agrupado como el entorno jboss ha https://developers.redhat.com/quickstarts/eap/cluster-ha-singleton/?referrer=jbd O puede usar el caché agrupado y el recurso de bloqueo de acceso entre el servicio y el primer servicio, tome el bloqueo, realizará la acción o implementará su propio jgroup para comunicar su servicio y realizar la acción en un nodo
fuente