@qq-framework/task-manager
v1.0.0
Published
Projeto Gerenciador de Tarefas das Lojas Quero-Quero
Downloads
4
Keywords
Readme
Como reprocessar tarefas de forma automática
O processo de reprocessamento é uma funcionalidade que permite a execução de tarefas de forma controlada, com tentativas repetidas em caso de falha. Ele é útil para tarefas que podem falhar temporariamente devido a problemas de rede, serviços externos indisponíveis, entre outros.
Pré-requisitos
Para utilizar o reprocessamento de tarefas, será necessário importar a model Reprocessamento nas entities do seu arquivo ormconfig.ts.
const entities = ['dist/src/modules/**/infra/models/*.model{.ts,.js}', ReprocessamentoModel]
export default {
type: 'postgres',
url: process.env.DATABASE_URL,
entities,
synchronize: false,
logging: false,
migrations: ['dist/src/shared/infra/typeorm/migrations/*.js'],
cli: {
migrationsDir: 'src/shared/infra/typeorm/migrations',
},
}Como usar
Atualmente há 2 formas de reprocessar tarefas de forma automática, são elas:
Utilizando o decorator
@RetryableEventPatterne o interceptor@RetryableEventPatternDlqInterceptor, ambos utilizam o Kafka para reprocessar as mensagens.Utilizando a classe
ControleProcessamentoe a classeControleReprocessamento, que são classes que permitem reprocessar tarefas de forma automática, sem a necessidade de utilizar o Kafka (mas que também podem implementar envio de eventos).
1. Utilizando o decorator @RetryableEventPattern e o interceptor @RetryableEventPatternDlqInterceptor
Para utilizar o decorator @RetryableEventPattern e o interceptor @RetryableEventPatternDlqInterceptor, você precisa seguir 2 principais etapas, são elas:
Etapa 1: Criar o Processamento
Inserir o decorator @RetryableEventPattern no método que deseja reprocessar, informando os parâmetros necessários.
@RetryableEventPattern(
param1,
param2,
param3,
param4
)O decorator RetryableEventPattern recebe quatro argumentos:
Param1-> topico: O nome do tópico do Kafka que será enviado a mensagem em caso de falha.Param2-> quantidadeCiclos: O número máximo de ciclos de reprocessamento. O valor padrão é 1.Param3-> quantidadeTentativasPorCiclo: O número máximo de tentativas por ciclo. O valor padrão é 3.Param4-> processo: O nome do processo à ser executado.
Exemplo de uso
@RetryableEventPattern(
KafkaTopics.IMAGEM_PRODUTO_IMPORTADA_FALHA_TOPIC,
1,
3,
'ImportacaoController.atualizarProdutoImagemFalha'
)
public async atualizarProdutoImagemFalha(
@Payload() mensagem: AtualizarStatusImportacaoUseCaseProps,
@Ctx() context: KafkaContext
): ResultAsync<any, void> {
return await this.atualizarProdutoImagemUseCase.execute(mensagem)
}Nesse exemplo, o método atualizarProdutoImagemFalha será reprocessado uma vez, com até 3 tentativas por ciclo, e o processo será chamado ImportacaoController.atualizarProdutoImagemFalha.
OBS: É necessário que seja fornecido o KafkaContext nos parâmetros
Etapa 2: Criar o Reprocessamento
Inserir o interceptor @RetryableEventPatternDlqInterceptor no controller que deseja reprocessar, informando os parâmetros necessários.
@UseInterceptors(RetryableEventPatternDlqInterceptor)O interceptor RetryableEventPatternDlqInterceptor não recebe argumentos. Mas, para que o reprocessamento funcione, é necessário que o método reprocessar seja chamado periodicamente, no tempo que achar necessário e que a rota do endpoint seja chamada passando os seguintes parâmetros no body:
{
"topico": "nome-do-topico",
"processo": "nome-do-processo",
"minutosEntreCiclos": 1
}OBS: O valor padrão para o parâmetro minutosEntreCiclos é 1 minuto.
Exemplo de uso
@Post('/teste')
@UseInterceptors(RetryableEventPatternDlqInterceptor)
public async atualizarProdutoImagemSucesso(@Body() props: PROPS_TYPE): Promise<HttpResponseError | HttpResponseOk> {
return super.buildResponse({
result: R.ok(),
})
}Neste exemplo, para reprocessar o processo dado como exemplo anteriormente, a rota /teste será chamada da seguinte forma:
curl --location 'http://IP:PORTA/PATH/MODULO/teste' \
--header 'Content-Type: application/json' \
--data '{
"topico": "nome-do-topico",
"processo": "ImportacaoController.atualizarProdutoImagemFalha",
"minutosEntreCiclos": 1
}'2. Utilizando a classe ControleProcessamento e a classe ControleReprocessamento
Para utilizar o controle de processamento, você precisa seguir 2 principais etapas, são elas:
Etapa 1: Criar o Processamento
Há duas maneiras de criar um processamento, a primeira é utilizando o método processar e a segunda é utilizando o método processarComCallback. A diferença entre eles é que o método processar irá realizar a função passada como parâmetro novamente, de forma autómatica, o número de vezes padronizado. Enquanto o método processarComCallback permite que você defina um callback personalizado, que será executado quando a função principal falhar. Segue abaixo os exemplos de uso de ambos os métodos:
Método processar
Instanciar a classe ControleProcessamento informando os parâmetros e chamar o método processar.
const controle = new ControleProcessamento(param1, param2)
await controle.processar(param3, param4)O classe ControleProcessamento recebe dois argumentos:
Param1-> processo: O nome do processo à ser executadoParam2-> parametros: Objeto que contém os parâmetros para o processamento.
O método processar recebe dois argumentos:
Param3-> props: Os dados que serão utilizados para chamar acallbackParam4-> callback: A função que será executada
Os parâmetros para o processamento são:
- quantidadeCiclos: O número máximo de ciclos de reprocessamento.
- quantidadeTentativasPorCiclo: O número máximo de tentativas por ciclo.
- cicloAtual: O ciclo atual. Este parâmetro é opcional e deve ser fornecido apenas se você deseja reprocessar um ciclo específico.
- numeroTentativa: O número da tentativa atual. Este parâmetro é opcional e deve ser fornecido apenas se você deseja reprocessar uma tentativa específica.
- id: O id do processamento. Este parâmetro é opcional e deve ser fornecido apenas se você deseja reprocessar um processamento específico.
OBS: Ao informar os parâmetros opcionais, tenha em mente que o processo poderá não seguir a ordem padrão de reprocessamento.
Exemplo:
const parametros = {
quantidadeCiclos: 3,
quantidadeTentativasPorCiclo: 3,
}Exemplo de uso
Aqui está um exemplo de como usar o processo para atualizar o estoque em um serviço externo:
const controle = new ControleProcessamento(ParallelStockUpdaterServiceImpl.name, {
quantidadeCiclos: 3,
quantidadeTentativasPorCiclo: 3,
})
await controle.processar(payload.data, async (processamento: Processamento) => {
const retornoVtex = await this.vtexService.updateInventoryBySkuAndWarehouse({
skuId: processamento.props['idSkuVtex'],
quantity: processamento.props['totalAvailableEcm'],
warehouseId: processamento.props['idWarehouse'],
branchId: processamento.props['idBranch'],
})
if (retornoVtex.isFailure()) {
return R.failure(new ErroProcessamento('Algum erro ocorreu ao atualizar o estoque!'))
}
return R.ok()
})OBS: Neste exemplo, a tarefa é atualizar o inventário em um serviço externo. Se a atualização falhar, a tarefa será reprocessada até 3 vezes por ciclo, com até 3 ciclos, e um intervalo de 60 minutos entre os ciclos.
Método processarComCallback
A diferença entre o método processar e o método processarComCallback é que o método processarComCallback permite que você defina um callback personalizado, que será executado quando a função principal falhar. Portanto, a utilização é a mesma, com a diferença de que o método processarComCallback recebe um callback como parâmetro.
Exemplo de uso
Aqui está um exemplo de como usar o processo para atualizar o estoque em um serviço externo:
const controle = new ControleProcessamento(ParallelStockUpdaterServiceImpl.name, {
quantidadeCiclos: 3,
quantidadeTentativasPorCiclo: 3,
})
await controle.processar(
payload.data,
async (processamento: Processamento) => {
const retornoVtex = await this.vtexService.updateInventoryBySkuAndWarehouse({
skuId: processamento.props['idSkuVtex'],
quantity: processamento.props['totalAvailableEcm'],
warehouseId: processamento.props['idWarehouse'],
branchId: processamento.props['idBranch'],
})
if (retornoVtex.isFailure()) {
return R.failure(new ErroProcessamento('Algum erro ocorreu ao atualizar o estoque!'))
}
return R.ok()
},
async (processamento: Processamento) => {
// Neste callback, você pode fazer o que quiser, como enviar uma mensagem para um tópico do Kafka
await this.kafkaProducerService.post({
conteudo: JSON.stringify({
correlationId: processamento.id,
cycles: processamento.cicloAtual,
retries: processamento.numeroTentativa,
data: {
...processamento.props,
},
}),
chave: processamento.id,
topico: KafkaTopics.REPROCESSED_STOCK,
headers: headers,
})
return R.ok()
}
)Etapa 2: Criar o Reprocessamento
Quando um Processamento atinge o máximo de tentativas, ele é marcado como PENDENTE e salvo na tabela reprocessamento. Para reprocessar um Processamento marcado como PENDENTE, você deve criar um novo método no controller que sejá chamado periodicamente, no tempo que achar necessário.
OBS: É interessante que não sejam executados mais 1 reprocessamento ao mesmo tempo, pois pode acabar gerando concorência. Para isso, é possível utilizar as configurações do AbstractUseCase, definindo o processo com singleThread
Esse método deve chamar algum processo que instâncie a classe ControleReprocessamento e chamar o método reprocessar,
const controle = new ControleReprocessamento(param1, param2)
await controle.reprocessar(param3)A classe ControleReprocessamento recebe dois argumentos:
Param1-> processo: O nome do processo à ser executadoParam2-> minutosEntreCiclos: O número de minutos entre um ciclo e outro utilizado como filtro na query feita na tabela Reprocessamento, tendo como base o valor da colunadt_criacao. O valor padrão é 1 minuto.
O método reprocessar recebe uma função de callback que será executada para cada Processamento marcado como PENDENTE.
OBS 1: O método reprocessar irá reprocessar todos os processamentos marcados como PENDENTE que estão dentro do intervalo de tempo definido pelo parâmetro minutosEntreCiclos.
Exemplo de uso
Aqui está um exemplo de como usar o reprocessamento para o mesmo processo de atualização de estoque em um serviço externo:
@Post('/reprocessar-teste')
async teste(): Promise<HttpResponseOk | HttpResponseError> {
const controle = new ControleReprocessamento(
ParallelStockUpdaterServiceImpl.name,
1
)
await controle.reprocessar(
async (processamento: Processamento) => {
const retornoVtex = await this.vtexService.updateInventoryBySkuAndWarehouse({
skuId: processamento.props['idSkuVtex'],
quantity: processamento.props['totalAvailableEcm'],
warehouseId: processamento.props['idWarehouse'],
branchId: processamento.props['idBranch'],
})
if (retornoVtex.isFailure()) {
return R.failure(new ErroProcessamento('Algum erro ocorreu ao atualizar o estoque!'))
}
return R.ok()
}
)
return super.buildResponse({
result: R.ok(`Reprocessamento iniciado!`),
successStatusCode: 202,
})
}Exceções
Existem três exceções que podem ser lançadas dentro da função de callback e que são tratadas pelo processo de reprocessamento:
ErroProcessamento: Esta exceção deve ser lançada quando ocorrer um erro que pode ser resolvido automaticamente. O processo de reprocessamento irá tentar novamente. Se esta exceção for lançada, o processo será marcado comoPENDENTEe será reprocessado conforme o parâmetrominutosEntreCiclos.ProcessamentoIrrecuperavelException: Esta exceção deve ser lançada quando ocorrer um erro que não pode ser resolvido automaticamente. O processo de reprocessamento irá parar e o erro será retornado. Se esta exceção for lançada, o processo será marcado comoERRO_PERMANENTEe não será reprocessado.CicloInvalidoException: Esta exceção deve ser lançada quando ocorrer um erro que não pode ser resolvido automaticamente neste ciclo. O processo de reprocessamento irá parar e o erro será retornado. Se esta exceção for lançada, o ciclo atual será ignorado E O processo será marcado comoPENDENTEe reprocessado conforme o parâmetrominutosEntreCiclos.
