Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job completion notification #93

Open
justinclift opened this issue Oct 5, 2023 · 19 comments
Open

Job completion notification #93

justinclift opened this issue Oct 5, 2023 · 19 comments
Labels
enhancement New feature or request

Comments

@justinclift
Copy link
Contributor

Am trying to figure out how enqueued jobs get notified they've been completed, so the task that enqueued the job knows to look for a response (eg in the backend database or wherever).

But I'm not seeing any info about it, and the channel pieces in the examples seem more just to keep the examples from not quitting more than anything.

Any ideas? 😄

@acaloiaro
Copy link
Owner

acaloiaro commented Oct 5, 2023

As it stands, there is no "notification" functionality built in.

With that said, the chan mechanism from the examples is safe to use in situations where Start/StartCron are run in the same memory space/process as Enqueue. If Enqueue and Start* are in separate processes, this is not viable. The obvious example where this works is with the in memory backend. Although it can work with the Postgres and Redis backends as well, so long as Enqueue and Start* are called in the same process. Which doesn't lend itself to distributed computing, but does enable asynchronous job processing.

For situations in which you're using the PG or Redis backends in separate processes, I would point you toward calling PG's NOTIFY/LISTEN or Redis' pub/sub or "streams" in Handler code.


If neoq had some sort of first-party "notification" system, what would you envision that looking like? What do you consider a notification? Would notification payloads be arbitrary, similar to job.Job, or are they more structured with a "status" concept, and more?

@justinclift
Copy link
Contributor Author

justinclift commented Oct 5, 2023

For my use cases, the Start() and Enqueue() code runs on different machines.

eg web interface or api server accepting requests from users, then distributing tasks (via Enqueue()) to workers on various other machines (with handlers set up using Start()) to do the requested tasks.

Example tasks that get distributed are:

  • create database XYZ
  • run query SELECT bar FROM baz on [some database]
  • execute statement CREATE TABLE ... on [some other database]
  • execute statement UPDATE TABLE ... on [another database]
  • delete database ABC

I was kind of expecting some kind of notification to be available for jobs after they've been enqueued. Along the lines of:

jobID, err := nq.Enqueue(context.Background(), j)
if err != nil {
    log.Fatalf("Unable to enqueue job: %s", err)
}
jobID.Wait() // Waits for the job to complete before proceeding

Or alternatively, maybe a new method that's more synchronous?

_, err := nq.Execute(context.Background(), j) // Enqueue the job, and wait for completion
if err != nil {
    log.Fatalf("Unable to execute job: %s", err)
}

Does that make sense? 😄

@acaloiaro
Copy link
Owner

Yeah what you describe makes sense. It's akin to calling Sidekiq's perform, if you're familiar with that library.

Let me give it some thought. To set expectations, I'm not sure that this is something I want for neoq yet. It may be, but I want to give it some thought first.

The most likely implementation would be something akin to your latter example, but it would return a channel and an error, e.g.

done, err := nq.Execute(context.Background(), j)
select {
  case job := <- done:
    if job.Status == "error" {
      ...
    }
  case <- timeoutChan:
    // timed out waiting for job to complete
}

I just need to consider whether this is a use case that I'd like neoq to support.

@justinclift
Copy link
Contributor Author

Cool, if it's a use case that you reckon Neoq should support, then I'm happy to try out potential implementations as you come up with them. 😄

justinclift added a commit to sqlitebrowser/dbhub.io that referenced this issue Oct 8, 2023
This is unfortunate, as without this capability Neoq will be
unusable for us:

  acaloiaro/neoq#93

Taking another look at the Neoq project and docs, this lack
doesn't seem to be clearly documented either.

Not sure if completion notification capability should be
expected in a job scheduling library though, as the only other
one I'm somewhat familiar with is RabbitMQ (which has it).

Neoq's job fingerprinting capability may cause problems for
us too later on, if that can't be disabled.  We defininitely
don't want to stop people running a particular task more than
once (aka "increment value foo", "create database abc", etc).
@justinclift
Copy link
Contributor Author

@acaloiaro What are your thoughts on this so far? 😄

@acaloiaro
Copy link
Owner

acaloiaro commented Oct 15, 2023

Hey @justinclift, I'm still mulling this one over and I think it's a decent idea. I'm more inclined to add it than not add it.

With that said, "it" will probably not be exactly what you've described here. "It" will likely be a generic mechanism for receiving job updates. Something along the lines of jobs.StatusListener(jobID int64) (c chan jobs.Job, err error).

What I'm somewhat torn about here is that I think (c chan jobs.Job, err error) would be a good return signature for Start(), but that means a major API-breaking change. So I'm being cautious at the moment and not rushing to a decision.

In the mean time, if you're using the Postgres backend, you can cobble together a job status watcher yourself by polling the neoq_jobs table for changes in the status of jobs. Here's a rough cut version:

	ticker := time.NewTicker(1 * time.Second)
	done := make(chan bool)
	var jobStatus string
waitloop:
	for {
		p.pool.QueryRow("SELECT status FROM neoq_jobs WHERE id = ?", jobID).Scan(&jobStatus)
		if jobStatus == "processed" {
			done <- true
		}
		select {
		case <-ticker.C:
			continue
		case <-done:
			break waitloop
		}
	}

I used a polling mechanism here, but a much more resource-friendly version would look like a channel that waits on LISTEN job_status_updates and then you add NOTIFY job_status_updates, jobID from your Handlers.

That way, any time a new notification comes over the job_status_updates channel from PG, your application can grab the job from neoq_jobs by ID and examine it for updates to status and/or error.

@justinclift
Copy link
Contributor Author

Thanks, I'll probably try that out some time in the coming week. Was pointed to a few other things that need investigation first (unrelated to this) though.

... would be a good return signature for Start(), but that means a major API-breaking change.

Instead of an API-breaking change, maybe add a new function?

Start2() doesn't sound like great naming 😉, but maybe something like StartWithNotify() (etc)?

@acaloiaro
Copy link
Owner

acaloiaro commented Oct 15, 2023 via email

@justinclift
Copy link
Contributor Author

Thanks, that sounds re-assuring. 😄

@acaloiaro acaloiaro added the enhancement New feature or request label Oct 24, 2023
@acaloiaro
Copy link
Owner

This will likely be one of the next items I pick up to work on @justinclift. Do you have anything to add since we last discussed it?

@justinclift
Copy link
Contributor Author

Nope, but am looking forward to trying this out. 😄

@justinclift
Copy link
Contributor Author

justinclift commented Dec 11, 2023

As a data point, I took some time over the last week to implement a job queue system with completion notification for my main focus project. Based the first pieces on your blog post. 😄

It's now running in production, but no idea (yet) how real world stable it is. In theory (!) it should be ok, but time will tell. 😄

@justinclift
Copy link
Contributor Author

justinclift commented Dec 11, 2023

The hardest piece (for me) was figuring out how to implement a system that returns responses to callers.

At first I wasn't sure how to do it, but after trying a few things out I went with a system that uses goroutines and channels:

https:/sqlitebrowser/dbhub.io/blob/master/common/postgresql_live.go#L542-L570

The underlying data type used to distribute responses to the matching caller:

https:/sqlitebrowser/dbhub.io/blob/master/common/live_types.go#L99-L127

The matching code that uses this data structure:

https:/sqlitebrowser/dbhub.io/blob/master/common/live.go#L686-L707

(called automatically from here)

It's probably not the greatest approach, nor the greatest code, but it might be useful food for thought. 😄

@acaloiaro
Copy link
Owner

acaloiaro commented Dec 11, 2023

Thanks for sharing that @justinclift. This looks pretty neat, and similar to what I have in mind. I definitely want neoq's implementation to be channel-based, and likely generic-based as well.

Was there anything that stands out to you as particularly difficult to get right, implementing your solution?

I haven't read through all your code yet, but I assume there's a channel-per-synchronous call. Is there anything in place to prevent the open channel list from monotonically growing? I'd like this feature to be resource-friendly so users can place some constraints on waiting for jobs to complete. Maybe deadlines for the channel(s) to produce results.

I've been working on commercial software the last couple weeks (that use neoq under the hood), but I do plan on coming back to this.

Unfortunately my commercial endeavors don't (yet) have any queue-and-wait needs, which is why I haven't gotten around to this. Would be happy to accept contributions if anyone is interested.

@justinclift
Copy link
Contributor Author

justinclift commented Dec 11, 2023

Is there anything in place to prevent the open channel list from monotonically growing?

Nah. Our infrastructure doesn't have a huge amount of load, so performance optimisations are just "nice" rather than a requirement.

If the load starts growing a lot then of course that'll need to be re-visited. 😄


The main piece our code uses to direct responses to callers is a simple map:

ResponseTargets := map[int]*chan ResponseInfo

As each job gets a job_id allocated to it upon submission to the database (INSERT INTO job_submissions ... RETURNING job_id), that map is handed the job_id and matching channel for sending the response to.

(The map is mutex guarded with appropriate lock()/unlock() calls for safety, and the approach seems fine for our purposes (thus far).)

When the database connection receives a NOTIFY, it grabs the details (from the database), looks up the matching job_id in the map mentioned above, then forwards the details of the response to the caller channel. (and removes the the caller entry from the map, as it won't be needed again)


I'd like this feature to be resource-friendly so users can place some constraints on waiting for jobs to complete. Maybe deadlines for the channel(s) to produce results.

That makes sense. I don't really mentally grok how to use context's properly yet (likely just needing some thought and practise to fix that), so everything that uses them (for me) is just using context.Background. So far. At some point I'll get around to understanding them better, but nothing I've worked on has really needed it yet. 😄

@acaloiaro
Copy link
Owner

That makes sense. I don't really mentally grok how to use context's properly yet (likely just needing some thought and practise to fix that), so everything that uses them (for me) is just using context.Background. So far. At some point I'll get around to understanding them better, but nothing I've worked on has really needed it yet. 😄

@justinclift I'm happy to share my Context mental model with you if you email!

@justinclift
Copy link
Contributor Author

Thanks. I'll do that, although not today. 😄

@justinclift
Copy link
Contributor Author

Just stumbled over a potential gotcha / food-for-thought while looking into potentially implementing two phase commit using my above approach.

It looks like PREPARE TRANSACTION is allergic to LISTEN/NOTIFY. From the docs:

It is not currently allowed to PREPARE a transaction that has executed any operations involving temporary tables or the session's temporary namespace, created any cursors WITH HOLD, or executed LISTEN, UNLISTEN, or NOTIFY

Not yet sure if it'll be a real problem, but it's unexpected (by me at least). 😉

@acaloiaro
Copy link
Owner

acaloiaro commented Dec 18, 2023 via email

@acaloiaro acaloiaro changed the title Job completion notification? Job completion notification Jan 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants