Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC/WIP : Extensible remote synchronization objects #5817

Closed
wants to merge 4 commits into from
Closed

RFC/WIP : Extensible remote synchronization objects #5817

wants to merge 4 commits into from

Conversation

amitmurthy
Copy link
Contributor

This PR replaces #5791 and make it possible to extend the types of process synchronization facilities that can be provided.

Consequently, channels(), tspaces() and kvspaces() as implemented previously in #5791 are now available as an external package - https://github.com/amitmurthy/SyncObjects.jl .

The user needs to provide a concrete implementation of AbstractRemoteSyncObj with so an object of type SyncObjData, and the following callbacks : cantake, canput, fetch, put, take and query

For example type RemoteChannel used to provide a channel functionality is defined in https://github.com/amitmurthy/SyncObjects.jl as

type RemoteChannel <: AbstractRemoteSyncObj
    q::Vector
    sz

    so::SyncObjData
    cantake::Function
    canput::Function
    fetch::Function
    put::Function
    take::Function
    query::Function

    RemoteChannel(T, sz) = new(Array(T, 0), sz, SyncObjData(), rccantake, rccanput, rcfetch, rcput, rctake, rcquery)
end

The actual remote object is created using a new method syncobj_create(pid, T::Type, args...) where T is the type of synchronization object being created.

As previously stated this is an alternative approach to having native Channels support in Julia (as in #5757).

@amitmurthy
Copy link
Contributor Author

The last update makes RemoteRef is a parametric type that now carries information about what type of object it refers, i.e RemoteRef{T} where T can be one of RemoteValue (in base) or RemoteChannel, RemoteTSpace or RemoteKVSpace (via SyncObjects.jl).

take!, put! and fetch accept a keyword arg timeout that allows for a timed wait on a RemoteRef

@amitmurthy
Copy link
Contributor Author

Using the latest update:

put! on channles, tspaces and kvspaces, now also accepts a keyword argument expire which results in the value deleted from the space if it is not taken within the specified time. A recurring timer on these spaces performs housekeeping tasks at regular intervals

A query callback on the containers has been added as a hook for other extensible functionality on the containers themselves. For example, length on RemoteRefs of types channels, tspaces and kvspaces returns the number of objects present in these spaces.

@amitmurthy
Copy link
Contributor Author

As an simple example of using tspaces, DB queries could be executed out-of-process - since the typical implementation ends up blocking the main julia thread - with something like :

pids = addprocs(8)  
using SyncObjects
ts = tspace()

@everywhere begin
    function dbworker(ts, dsn)
        conn = DB.connect(dsn)
        while true
            r = take!(ts, dsn)
            respref = r[2]
            querystr = r[3]

            put!(r[2], DB.query(conn, querystr))
        end
    end
end

# start the DB worker pool
for p in pids
    @spawnat p dbworker(ts, "mydb")
end

# out-of-process db query execution - these can be called concurrently from any number of 
# tasks - they will not lock up the calling process.
rr = RemoteRef()
put!(ts, ("mydb", rr, "select * from mytable"))
result = take!(rr)

As a test, I tried out 10,000 concurrent tasks waiting on 8 external workers to finish an equal number of tasks, code : https://gist.github.com/amitmurthy/9089918 . The scheduler didn't miss a beat - sailed through!

@amitmurthy
Copy link
Contributor Author

Folks, would like some feedback on this functionality.

cc: @JeffBezanson

@StefanKarpinski
Copy link
Member

I have some rather general reservations about timeout/expiration-based interfaces. They strike me as never what you really want. Let me explain. In this case, why not simply remember how old each entry is and eject the oldest entries to make room for newer ones only when you need to? Is there any advantage to ejecting entries after a fixed amount of time?

Likewise, when doing network programming with timers, rather than setting somewhat arbitrary timeouts at each level, what you really want is the ability to interrupt tasks from the outside. Consider an I/O task with six steps that you want to ensure the completion of in under a minute. If you put a timeout of 10 seconds on each step, that certainly ensures that the whole thing completes in under a minute. But what if one of the steps takes 11 seconds and the other steps finish in under a second? The total time would be no more than 16 seconds, but it will still timeout. To allow every possible way of finishing the task in under a minute, you have to give the first task a timeout of 60 seconds, then give the next task a timeout for what remains of the total 60 seconds, and so on. This is doable, but it's a huge pain – the programmer spends a huge amount of their code dealing with timeouts that just aren't the most important thing. It would be much nicer to have a supervising task that can simply wait 60 seconds while the I/O task does it's thing – completely ignorant of timeouts – and if the I/O task isn't done after 60 seconds, the supervising task just kills it and deals with the fact that it took too long. This is a vastly better API: the main I/O code includes no mention of any timeouts. Moreover, separating the timeout logic from the main I/O code, means that you can add the timeout handling easily after the fact instead of having to go through the main I/O logic and retrofit it with all this tedious timeout logic. To me, timeouts and expirations vs. interruptible tasks is very much like C-style error handling vs. exceptions, except possibly an even bigger usability difference.

I know this expiration business is not quite I/O timeout programming, so this rant is slightly off-topic, but this has been weighing on my mind for quite a while, so I thought I'd express it.

@JeffBezanson
Copy link
Member

I very much agree with @StefanKarpinski here. And we already have the ability to do what he describes; a supervisor task can sleep for 60 seconds, and if the target task is still waiting it can be woken with an exception. wait will then remove it from the queue of whatever it's waiting for, and normal exception handling will take over.

@amitmurthy
Copy link
Contributor Author

@StefanKarpinski

Is there any advantage to ejecting entries after a fixed amount of time?

Two things come to mind when building a job distribution engine using a tspace:

  1. A job may be irrelevant if not executed within a particular time - the submitter task may have gone away. The expiration field helps in preventing such unnecessary execution.
  2. Reduce memory consumption. Since the objects in the tspace and kvspace will never get gc'ed, we simply do not have to keep them around longer than necessary. I could use the kvspace to build a memcached like service, which would use an LRU model to clean-up entries like you suggested, but there are other use cases where object expiration makes sense.

supervising task that can simply wait 60 seconds

Since none of the wait calls support a timeout, it can't do this today without setting up a Timer and waiting on a condition variable, that is set off either by the timer or the IO task upon completion. This implementation does the same thing internally. Or by implementing the model suggested by Jeff.

To allow every possible way of finishing the task in under a minute, you have to give the first task a
timeout of 60 seconds, then give the next task a timeout for what remains of the total 60 seconds, and
so on. This is doable, but it's a huge pain – the programmer spends a huge amount of their code dealing
with timeouts that just aren't the most important thing

Not really, consider a web front-end in Julia that spawns off tasks (each then spawns its dependencies) for each request. All we do is set 60 seconds at each step including the outer one. The user on his browser gets a nice error message withing 60 seconds, while each sub-task does not wait forever in the server, each cleans itself up in 60 seconds.

...separating the timeout logic from the main I/O code...

This would be a great thing to do, but does not preclude supporting timeouts in wait for use cases which require it.

@JeffBezanson

a supervisor task can sleep for 60 seconds, and if the target task is still waiting it can be woken with an exception.

This is an implementation detail. For a web front-end we could literally have hundreds of concurrent tasks. We don't want an API where the programmer first has to register with a supervisor with the timeout value required instead of specifying it in the wait call itself. You would want a single supervisor task supervising all such concurrent tasks in the system, so sleep 60 won't do, the supervisor task will have to check every second and wake up the timed out tasks. This is an implementation detail - single supervisor vs timers, but the wait call could still accept the timeout value.

@amitmurthy
Copy link
Contributor Author

Would also like your thoughts on the general idea of this PR, i.e., user definable synchronization types, like has been done to construct channels, tspaces and kvspaces in a separate package.

@amitmurthy
Copy link
Contributor Author

Closing as same functionality has been implemented in package https://github.com/amitmurthy/MUtils.jl

@amitmurthy amitmurthy closed this Mar 11, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants