Go 构建基础的事件调度器

news2025/6/19 13:00:18

👇我在这儿 

当我们需要在一段时间后的特定时间或间隔运行任务时,我们需要使用任务调度系统来运行任务:例如发送电子邮件、推送通知、午夜关闭账户、清空表格等。

在本文中,我们将构建一个基本的事件调度程序,使用数据库作为持久层来调度事件在特定时间段运行,这将使我们了解事件调度系统的工作原理。

基本的工作机制是:

每当我们需要调度事件时,计划作业就会添加到数据库中以在特定时间运行。

另一个任务始终定期运行以检查数据库中的某些任务是否已过期, 如果在数据库中发现已过期任务(轮询)则运行计划作业。

efa3163124daae54f09d1240fe4b0290.png

让我们从创建用于存储事件的数据库(在 postgresql 中)开始:

CREATE TABLE IF NOT EXISTS "public"."jobs" (     
   "id"      SERIAL PRIMARY KEY,     
   "name"    varchar(50) NOT NULL,     
   "payload" text,     
   "runAt"   TIMESTAMP NOT NULL    
)

现在,我们来定义数据结构:

  • Event : 调度事件

  • Listeners : 事件监听器列表

  • ListenFunc: 触发事件时调用的函数

// Listeners has attached event listeners
type Listeners map[string]ListenFunc

// ListenFunc function that listens to events
type ListenFunc func(string)

// Event structure
type Event struct {
 ID      uint
 Name    string
 Payload string
}

还需要定义 Scheduler 结构,用于调度事件和运行侦听器:

// Scheduler data structure
type Scheduler struct {
 db        *sql.DB
 listeners Listeners
}

// NewScheduler creates a new scheduler
func NewScheduler(db *sql.DB, listeners Listeners) Scheduler {
 return Scheduler{
  db:        db,
  listeners: listeners,
 }
}

在第 8 行到第 13 行中,我们通过将sql.DB实例和初始侦听器传递给调度程序来创建新的调度程序。

现在,我们实现调度函数,并将我们的事件插入到 jobs 表中:

// Schedule sechedules the provided events
func (s Scheduler) Schedule(event string, payload string, runAt time.Time) {
 log.Print("🚀 Scheduling event ", event, " to run at ", runAt)
 _, err := s.db.Exec(`INSERT INTO "public"."jobs" ("name", "payload", "runAt") VALUES ($1, $2, $3)`, event, payload, runAt)
 if err != nil {
  log.Print("schedule insert error: ", err)
 }
}

// AddListener adds the listener function to Listeners
func (s Scheduler) AddListener(event string, listenFunc ListenFunc) {
 s.listeners[event] = listenFunc
}

在 AddListener 函数中,我们为事件分配监听函数。

我们已经首先完成了添加 jobs 表。现在需要从数据库中获取已经过期的作业,执行然后删除它们。

下面的函数实现显示了我们如何检查表中的过期事件并将事件序列化到 Event 结构中:

// checkDueEvents checks and returns due events
func (s Scheduler) checkDueEvents() []Event {
 events := []Event{}
 rows, err := s.db.Query(`SELECT "id", "name", "payload" FROM "public"."jobs" WHERE "runAt" < $1`, time.Now())
 if err != nil {
  log.Print("💀 error: ", err)
  return nil
 }
 for rows.Next() {
  evt := Event{}
  rows.Scan(&evt.ID, &evt.Name, &evt.Payload)
  events = append(events, evt)
 }
 return events
}

第二步是调用从数据库中找到的已注册事件侦听器,如下所示:

// callListeners calls the event listener of provided event
func (s Scheduler) callListeners(event Event) {
 eventFn, ok := s.listeners[event.Name]
 if ok {
  go eventFn(event.Payload)
  _, err := s.db.Exec(`DELETE FROM "public"."jobs" WHERE "id" = $1`, event.ID)
  if err != nil {
   log.Print("💀 error: ", err)
  }
 } else {
  log.Print("💀 error: couldn't find event listeners attached to ", event.Name)
 }

}// callListeners calls the event listener of provided event
func (s Scheduler) callListeners(event Event) {
 eventFn, ok := s.listeners[event.Name]
 if ok {
  go eventFn(event.Payload)
  _, err := s.db.Exec(`DELETE FROM "public"."jobs" WHERE "id" = $1`, event.ID)
  if err != nil {
   log.Print("💀 error: ", err)
  }
 } else {
  log.Print("💀 error: couldn't find event listeners attached to ", event.Name)
 }

}

在这里,我们正在检查是否有绑定的事件函数,如果找到则调用事件的监听器函数。

第 6 行到第 9 行将从数据库中删除事件,以便在下次轮询数据库时不会再找到。

最后一步是(轮询)检查某个事件是否在给定时间间隔内过期。

对于间隔运行的任务,我们使用 time 库的 ticker 函数,该函数将提供一个通道,该通道在提供的间隔内接收新的 tick

// CheckEventsInInterval checks the event in given interval
func (s Scheduler) CheckEventsInInterval(ctx context.Context, duration time.Duration) {
 ticker := time.NewTicker(duration)
 go func() {
  for {
   select {
   case <-ctx.Done():
    ticker.Stop()
    return
   case <-ticker.C:
    log.Println("⏰ Ticks Received...")
    events := s.checkDueEvents()
    for _, e := range events {
     s.callListeners(e)
    }
   }

  }
 }()
}

在第 7 行和第 10 行中,我们检查上下文是否已关闭或 ticker通道是否正在接收新的 tick

在 11 行接收到 tick 后,我们检查到期事件,然后调用所有事件的侦听器函数。

下一步就是在 main.go 中,实际使用我们前面定义的那些函数,如下所示:

package main

import (
 "context"
 "log"
 "os"
 "os/signal"
 "time"

 "github.com/dipeshdulal/event-scheduling/customevents"
)

var eventListeners = Listeners{
 "SendEmail": customevents.SendEmail,
 "PayBills":  customevents.PayBills,
}

func main() {
 ctx, cancel := context.WithCancel(context.Background())

 interrupt := make(chan os.Signal, 1)
 signal.Notify(interrupt, os.Interrupt)

 db := initDBConnection()

 scheduler := NewScheduler(db, eventListeners)
 scheduler.CheckEventsInInterval(ctx, time.Minute)

 scheduler.Schedule("SendEmail", "mail: nilkantha.dipesh@gmail.com", time.Now().Add(1*time.Minute))
 scheduler.Schedule("PayBills", "paybills: $4,000 bill", time.Now().Add(2*time.Minute))

 go func() {
  for range interrupt {
   log.Println("\n❌ Interrupt received closing...")
   cancel()
  }
 }()

 <-ctx.Done()
}

在第13行到第16行中,我们将侦听函数绑定到事件 SendEmail 和 PayBills上,以便在发生新事件时调用这些函数。

在 22行 和 32 到 37 行中,我们添加了中断信号(os.Interrupt)通道,当程序中发生中断时,我们执行 19 行中的上下文取消函数。

从第 26 行到第 30 行,我们定义事件调度程序、运行轮询函数并将在一分钟后运行 SendEmail ,两分钟后运行 PayBills

程序的输出将如下所示:

2021/01/16 11:58:49 💾 Seeding database with table...
2021/01/16 11:58:49 🚀 Scheduling event SendEmail to run at 2021-01-16 11:59:49.344904505 +0545 +0545 m=+60.004623549
2021/01/16 11:58:49 🚀 Scheduling event PayBills to run at 2021-01-16 12:00:49.34773798 +0545 +0545 m=+120.007457039
2021/01/16 11:59:49 ⏰ Ticks Received...
2021/01/16 11:59:49 📨 Sending email with data:  mail: nilkantha.dipesh@gmail.com
2021/01/16 12:00:49 ⏰ Ticks Received...
2021/01/16 12:01:49 ⏰ Ticks Received...
2021/01/16 12:01:49 💲 Pay me a bill:  paybills: $4,000 bill
2021/01/16 12:02:49 ⏰ Ticks Received...
2021/01/16 12:03:49 ⏰ Ticks Received...
^C2021/01/16 12:03:57 
❌ Interrupt received closing...

从输出中,我们可以看到 SendEmail 在一分钟后触发,事件 PayBills 在第二分钟后触发。

通过这种方式,我们构建了一个基本的事件调度系统,它将在一定时间间隔后调度事件。

这个例子只展示了事件调度程度的基本实现,并未覆盖诸如:如果两个轮询间隔之间发生重叠,如何处理,如何不使用轮询等。我们可以使用 rabbitmqkafka 等完成一个最终严谨的事件调度程度。

原文地址:

https://medium.com/wesionary-team/building-basic-event-scheduler-in-go-134c19f77f84

原文作者:

Dipesh Dulal

本文永久链接:https://github.com/gocn/translator/blob/master/2023/w15_Building_Basic_Event_Scheduler_in_Go.md

译者:lsj1342

校对:cvley


7479ef6e10050cc5caeda1eb0938aedb.jpeg

Ding! 您有一份参会邀请待查收👇

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/411639.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于springboot和ajax的简单项目 02.一直会出现的页面的上一页,下一页,分页与总页数 (下)

在各种功能中会一直出现页面分页的问题。 对此&#xff0c;可以使用pojo对象&#xff0c;来一直管理页面分页的功能。 01.创建相关的pojo对象。 由于属性是来辅助sql语句的&#xff0c;这个pojo对象。 Setter Getter ToString NoArgsConstructorpublic class PageObject<T&…

MySQL数据库实现主从同步

安装MySQL数据库8.0.32 前言 今天来学习数据库主从同步的原理及过程&#xff0c;数据库主要是用来存储WEB数据&#xff0c;在企业当中是极为重要的&#xff0c;下面一起来看下。 1.1 数据库做主从的目的 MySQL主从复制在中小企业&#xff0c;大型企业中广泛使用&#xff0c…

【ROS2指南-2】入门 turtlesim 和 rqt

目标&#xff1a;安装并使用 turtlesim 包和 rqt 工具为即将到来的教程做准备。 教程级别&#xff1a;初学者 时间&#xff1a; 15分钟 内容 背景 先决条件 任务 1 安装turtlesim 2 启动turtlesim 3 使用turtlesim 4 安装rqt 5 使用 rqt 6 重新映射 7 关闭turtlesim …

算法 || DFS(深度优先搜索) BFS(广度优先搜索)

&#xff11;、基本概念 dfs全称为Depth First Search,即深度优先搜索。它的思想是沿着每一条可能的路径一个节点一个节点地往下搜索,搜到了路径的到终点再回溯,一直到所有路径搜索完为止。 bfsbfs全称为Breath First Search,即广度(宽度)优先搜索。它的思想是将每一层的结搜…

【SQL】数据库的创建,表的创建、更新、删除

本文内容参考书籍《SQL基础教程》&#xff0c;初学者&#xff0c;请多指教。 一、数据库的创建 1、创建数据库语句 CREATE DATABASE <数据库名称>; 2、示例 CREATE DATABAST shop&#xff1b; 二、表的创建 创建表之前&#xff0c;必须先创建用于存储表的数据库。 1、…

如何找出消耗CPU最多的线程?

如何找出消耗CPU最多的线程&#xff1f; 1.使用 top -c 找出所有当前进程的运行列表 top -c 2.按P(Shiftp)对所有进程按CPU使用率进行排序&#xff0c;找出消耗最高的线程PID ​​​ 显示Java进程 PID 为 136 的java进程消耗最 3.使用 top -Hp PID&#xff0c;查出里面消…

JavaEE简单示例——基于XML配置文件的SSM整合

SSM整合 在本节中&#xff0c;我们会将之前我们学习过的三个框架结合起来&#xff0c;让他们可以融合起来&#xff0c;搭建成一个完整的贯穿整个三层架构的整体框架。 三层框架与对应的框架功能 我们首先回顾一下我们编写软件的三层框架以及对应使用的框架都分别是什么&…

代码随想录算法训练营第五十九天| 503. 下一个更大元素 II、42. 接雨水。

503. 下一个更大元素 II 题目链接&#xff1a;力扣 题目要求&#xff1a; 给定一个循环数组 nums &#xff08; nums[nums.length - 1] 的下一个元素是 nums[0] &#xff09;&#xff0c;返回 nums 中每个元素的 下一个更大元素 。数字 x 的 下一个更大的元素 是按数组遍历顺…

( “树” 之 DFS) 110. 平衡二叉树 ——【Leetcode每日一题】

110. 平衡二叉树 给定一个二叉树&#xff0c;判断它是否是高度平衡的二叉树。 本题中&#xff0c;一棵高度平衡二叉树定义为&#xff1a; 一个二叉树每个节点 的左右两个子树的高度差的绝对值不超过 1 。 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] …

智慧钢铁厂人员定位系统解决方案,助力钢铁企业提升安全生产管理水平

作为国民经济的基础原材料产业&#xff0c;钢铁工业在经济发展中具有重要地位。中国钢铁工业不仅为我国国民经济的快速发展做出了重大贡献&#xff0c;也为世界经济的繁荣和世界钢铁工业的发展起到了积极的促进作用&#xff0c;但钢铁行业在快速发展的同时也存在一些安全管理问…

React应用(基于React脚手架)

目录前言&#xff1a;一、使用create-react-app创建react应用1、什么是 react 脚手架&#xff1f;2. 创建 cli 脚手架方式13. 创建 cli 脚手架方式24. npx:5. react脚手架项目结构6. 功能界面的组件化编码流程&#xff08;通用&#xff09;7. 如何更改脚手架版本二、React 组件…

【SpringMVC】7—文件上传

⭐⭐⭐⭐⭐⭐ Github主页&#x1f449;https://github.com/A-BigTree 笔记链接&#x1f449;https://github.com/A-BigTree/Code_Learning ⭐⭐⭐⭐⭐⭐ 如果可以&#xff0c;麻烦各位看官顺手点个star~&#x1f60a; 如果文章对你有所帮助&#xff0c;可以点赞&#x1f44d;…

【深入理解SSD 实践】对NVMe SSD热插拔时,正确做法是怎样的?

声明 主页&#xff1a;元存储的博客_CSDN博客 依公开知识及经验整理&#xff0c;如有误请留言。 个人辛苦整理&#xff0c;付费内容&#xff0c;禁止转载。 内容摘要 前言 概念 SAS/SATA 和NVMe 区别 热插拔分类 热插拔基本原理 如何确认是否支持热插拔&#xff1f; 热插…

Python批量导出阿里云ECS和Redis实例的监控数据到Excel

背景 某公司使用阿里云的 ECS 和 Redis 服务作为其业务支撑&#xff0c;为了及时了解机器的使用情况&#xff0c;领导要求业务部门对所有阿里云机器的平均资源使用率进行统计&#xff0c;并汇总在一个 Excel 表格中&#xff0c;以便领导查看和分析。 需求 为了满足领导的需求…

C++ const的作用

1.const在C中是只读变量&#xff0c;在C中表示常量 以下代码&#xff0c;在C中是错误的&#xff0c;但是在C中是正确的。 void main() {const int n 10;int arr [n]; }2.const不仅可以定义一个常量&#xff0c;也可以定义函数参数 例如&#xff1a; char *strcpy(char *dest…

一次弄懂gzip模块启用和配置指令

接下来所学习的指令都来自ngx_http_gzip_module模块&#xff0c;该模块会在nginx安装的时候内置到nginx的安装环境中&#xff0c;也就是说我们可以直接使用这些指令。 1. gzip指令&#xff1a;该指令用于开启或者关闭gzip功能 注意只有该指令为打开状态&#xff0c;下面的指令才…

联想凌拓 ThinkSystem DE 系列全闪存阵列

ThinkSystem DE 系列全闪存阵列 超高的性能&#xff0c;极具竞争力的价格 通过消除过量配置最大限度地提高效率&#xff0c;同时通过降低空间、电源和冷却要求来降低成本。 利用高级数据保护&#xff0c;在本地或从远距离上防止数据丢失和停机事件。 在模块化 2U 构建模块中…

ModStartBlog v7.1.0 ChatGPT支持,界面全新优化

ModStart 是一个基于 Laravel 模块化极速开发框架。模块市场拥有丰富的功能应用&#xff0c;支持后台一键快速安装&#xff0c;让开发者能快的实现业务功能开发。 系统完全开源&#xff0c;基于 Apache 2.0 开源协议。 功能特性 丰富的模块市场&#xff0c;后台一键快速安装 …

【ChatGPT】ChatGPT-5 强到什么地步?

Yan-英杰的主页 悟已往之不谏 知来者之可追 C程序员&#xff0c;2024届电子信息研究生 目录 ChatGPT-5 强到什么地步&#xff1f; 技术 深度学习模型的升级 更好的预测能力 自适应学习能力 特点 语言理解能力更强 自我修正和优化 更广泛的应用领域 应用 对话系统 智能写作…

2.含电热联合系统的微电网运行优化(文章复现)

说明书 相关代码资源&#xff1a;基于多目标粒子群算法冷热电联供综合能源系统运行优化 基于拉丁超立方法的风光场景生成与削减 粒子群综合能源系统优化的matlab实现 智能微电网PSO优化算法 MATLAB代码&#xff1a;含电热联合系统的微电网运行优化 关键词&#xff1a;微网…