Discussions related to Visual Prolog
Martin Meyer
VIP Member
Posts: 354
Joined: 14 Nov 2002 0:01

Thread terminates listener

Post by Martin Meyer »

Hello Thomas,

in a class that does not construct objects, I am allocating one buffer per each thread which uses the class. The buffers contain some collections and are stored in a red-black tree in a class fact:

Code: Select all

domains     buf = buf(setM{unsigned} SetM, mapM{unsigned, string} MapM).   class facts     bufTree : redBlackTree::tree{multiThread_native::threadID ThreadId, buf Buf} :=     redBlackTree::emptyUnique() [immediate].
When a thread terminates, I want to automatically release its buffer. One idea I had how to do it was:

Code: Select all

class predicates     getBuf : () -> buf Buf. clauses     getBuf() = Buf :-         ThreadHandle = multiThread_native::getCurrentThread(),         ThreadID = multiThread_native::getThreadID(ThreadHandle),         if Buf = redBlackTree::tryLookUp(bufTree, ThreadId) then         else             Buf = buf(setM_redBlack::new(), mapM_redBlack::new()),             bufTree := redBlackTree::insert(bufTree, ThreadId, Buf),             WeakRef = weakReference::new(ThreadHandle),             WeakRef:released:addListener({ :- bufTree := redBlackTree::deleteAll(bufTree, ThreadID) })         end if.
But I suspect that this does not work because the memory pointed to by ThreadHandle is not disposed through VIP's garbage collector. Right?

If that does not work, then how to release the buffers automatically?
Regards Martin
User avatar
Thomas Linder Puls
VIP Member
Posts: 1466
Joined: 28 Feb 2000 0:01

Re: Thread terminates listener

Post by Thomas Linder Puls »

Yes, you are right/No, that won't work.

A "ThreadHandle" is not under GC control at you say. Furthermore, you should notice that the released event on weakReference is only fired when there is an attempt to reference the data when it is released; it does not automatically fires when the data is released.

Anyway, since the handle is not that kind of memory at all it will not fire at all.

The only way I can think of is to use a try-finally:

Code: Select all

clauses     startMyThread(Task) :-         _ =             thread::start(                 {  :-                     try                         Task()                     finally                         bufferClass::cleanUp()                     end try                 }).
But that requires that the threads are started in a special way (which may not be "nice").

You should also be aware that your bufTree is handled in a non-threadsafe way. To be certain, I suggest you use a mapM_redBlackTree_cas for this purpose instead.
Regards Thomas Linder Puls
PDC
User avatar
Thomas Linder Puls
VIP Member
Posts: 1466
Joined: 28 Feb 2000 0:01

Re: Thread terminates listener

Post by Thomas Linder Puls »

Notice that multiThread_native::getCurrentThread returns a pseudo handle. See GetCurrentThread function.
Regards Thomas Linder Puls
PDC
Martin Meyer
VIP Member
Posts: 354
Joined: 14 Nov 2002 0:01

Re: Thread terminates listener

Post by Martin Meyer »

Thanks for pointing me to mapM_redBlack_cas!

I have noticed that there is also a class mapM_redBlack_block. That class is also threadsafe? In which situations do I have to use the .._cas class and in which the the .._block class?

I tried another solution to achieve automatically release of buffers: I am running a suspending predicate in background that looks every three minutes for terminated threads and releases their buffers. Do you think it could work as below?

Code: Select all

class myClass   predicates     releaseInactiveBuffers : ().       ...   end class myClass   %---   implement myClass     open core   domains     buf = buf(arrayM{unsigned} ArrayM_a, arrayM{unsigned} ArrayM_b).   class facts     bufMapM : mapM{windowsApi::threadHandle ThreadHandle, buf Buf} := mapM_redBlack_cas::new() [immediate].   class predicates     init : () [classInitializer]. clauses     init() :-         _Future = pfc\asynchronous\future::submit_unit({  :- releaseInactiveBuffers_loop() }).   class predicates     releaseInactiveBuffers_loop : () suspending. clauses     releaseInactiveBuffers_loop() :-         programControl::sleep(3 * 60 * 1000),         releaseInactiveBuffers(),         releaseInactiveBuffers_loop().   clauses     releaseInactiveBuffers() :-         foreach ThreadHandle = bufMapM:getKey_nd() do             if multiThread_native::waitForSingleObject(ThreadHandle, 0) <> multiThread_native::wait_timeout then                 bufMapM:removeKey(ThreadHandle)             end if         end foreach.   class predicates     getBuf : () -> buf Buf. clauses     getBuf() = Buf :-         ThreadHandle = multiThread_native::getCurrentThread(),         if Buf = bufMapM:tryGet(ThreadHandle) then         else             Buf = buf(arrayM::newExpandable(0), arrayM::newExpandable(0)),             bufMapM:set(ThreadHandle, Buf)         end if.       ...         end implement myClass
Regards Martin
Martin Meyer
VIP Member
Posts: 354
Joined: 14 Nov 2002 0:01

Re: Thread terminates listener

Post by Martin Meyer »

Ah, the pseudo handle is a constant value that is the same for all threads, so I cannot use it as an unique identifier for the threads.

But then the question becomes how to get the real handle of the own thread. Or, if I use the thread-IDs as unique identifiers, how can I determine if a thread with a certain ID has terminated?
Regards Martin
Martin Meyer
VIP Member
Posts: 354
Joined: 14 Nov 2002 0:01

Re: Thread terminates listener

Post by Martin Meyer »

I thought it would be a smart decision to make my class a module that does not create objects. But now that I am facing the difficult consequences, I have realized it is a bad design.

I am going to change my class to a class that creates instances. Buffers will then be disposed naturally along with the class instances.

Still, I would like to know more about mapM_redBlack_cas vs. mapM_redBlack_block so I can get an idea of ​​when to use which of the two.
Regards Martin
User avatar
Thomas Linder Puls
VIP Member
Posts: 1466
Joined: 28 Feb 2000 0:01

Re: Thread terminates listener

Post by Thomas Linder Puls »

Regarding _cas and _block I chose to write something in the wiki: Thread safety in Visual_Prolog/mapM and setM.

A little note: "sleep" is a blocking operation, and in a suspending predicate it is much better to use the non-blocking yield predicate (in pfc\asynchronous\executionContext):

Code: Select all

     ...,      yield(timeoutAfter(3 * oneMinute))
Furthermore now that I actually have figured out how to solve your problems, I think I will write it here even though you may pursuit another solution. After all, it may give you some valuable information.

You can obtain the "real" handle by calling multiThread_api::mkNonInheritable on the on the pseudo handle (while in the right thread). The obtained handle must be closed when it is no longer used/needed.

Using suspending code you can wait for the thread to terminate and then delete the buffer and the close thread handle. All in all it can look like this:

Code: Select all

class predicates     getBuf : () -> buf Buf. clauses     getBuf() = Buf :-         ThreadHandle = multiThread_native::getCurrentThread(),         ThreadID = multiThread_native::getThreadID(ThreadHandle),         if Buf = bufferMap:tryGet(ThreadId) then         else             Buf = buf(setM_redBlack::new(), mapM_redBlack::new()),             bufferMap:set(ThreadId, Buf),             ActualThreadHandle = multiThread_api::mkNonInheritable(ThreadHandle, :CloseOriginal = false),             SyncObject = syncObject::new(ActualThreadHandle, :NeedRelease = false),             % we close the object explicitly below             _Future =                 pfc\asynchronous\future::submit_unit(                     {  :-                         _ = SyncObject:waitAsync(),                         SyncObject:close(),                         bufferMap:removeKey(ThreadID)                     },                     :Ctx = pfc\asynchronous\executionContext_pool::defaultPool)         end if.
Disclaimer I have not tried the code (I just assume it will work :-)).
Regards Thomas Linder Puls
PDC
Martin Meyer
VIP Member
Posts: 354
Joined: 14 Nov 2002 0:01

Re: Thread terminates listener

Post by Martin Meyer »

Thanks Thomas for the enlightening infos!

I have tested your code and it worked well. My test:

Code: Select all

class myClass   domains     buf = buf(arrayM{unsigned} ArrayM_a, arrayM{unsigned} ArrayM_b).   predicates     getBuf : () -> buf Buf.   properties     bufMapM : mapM{multiThread_native::threadId ThreadId, buf Buf}.   end class myClass   %---   implement myClass   class facts     bufMapM : mapM{multiThread_native::threadId ThreadId, buf Buf} := mapM_redBlack_cas::new() [immediate].   clauses     getBuf() = Buf :-         ThreadHandle = multiThread_native::getCurrentThread(),         ThreadID = multiThread_native::getThreadID(ThreadHandle),         if Buf = bufMapM:tryGet(ThreadId) then         else             Buf = buf(arrayM::new(0), arrayM::new(0)),             bufMapM:set(ThreadId, Buf),             ActualThreadHandle = multiThread_api::mkNonInheritable(ThreadHandle, :CloseOriginal = false),             SyncObject = syncObject::new(ActualThreadHandle, :NeedRelease = false),             % we close the object explicitly below             _Future =                 pfc\asynchronous\future::submit_unit(                     {  :-                         _ = SyncObject:waitAsync(),                         SyncObject:close(),                         bufMapM:removeKey(ThreadID)                     },                     :Ctx = pfc\asynchronous\executionContext_pool::defaultPool)         end if.   end implement myClass   %======   implement main   clauses     run() :-         _Future1 =             pfc\asynchronous\future::submit_unit(                 {  :-                     programControl::sleep(1000),                     _Buf = myClass::getBuf()                 }),         _Future2 =             pfc\asynchronous\future::submit_unit(                 {  :-                     programControl::sleep(4000),                     _Buf = myClass::getBuf()                 }),         _Future3 =             pfc\asynchronous\future::submit_unit(                 {  :-                     programControl::sleep(7000),                     _Buf = myClass::getBuf()                 }),         watchLoop(0).   class predicates     watchLoop : (unsigned Num). clauses     watchLoop(Num) :-         stdIO::write(Num, " seconds: "),         stdIO::present(myClass::bufMapM),         stdIO::nl(),         programControl::sleep(1000),         watchLoop(Num + 1).   end implement main
The output was:

Code: Select all

0 seconds: [] 1 seconds: [26244 -> buf([], [])] 2 seconds: [26244 -> buf([], [])] 3 seconds: [26244 -> buf([], [])] 4 seconds: [20260 -> buf([], []), 26244 -> buf([], [])] 5 seconds: [20260 -> buf([], []), 26244 -> buf([], [])] 6 seconds: [20260 -> buf([], []), 26244 -> buf([], [])] 7 seconds: [20260 -> buf([], []), 22548 -> buf([], []), 26244 -> buf([], [])] 8 seconds: [20260 -> buf([], []), 22548 -> buf([], []), 26244 -> buf([], [])] 9 seconds: [20260 -> buf([], []), 22548 -> buf([], []), 26244 -> buf([], [])] 10 seconds: [20260 -> buf([], []), 22548 -> buf([], []), 26244 -> buf([], [])] 11 seconds: [20260 -> buf([], []), 22548 -> buf([], []), 26244 -> buf([], [])] 12 seconds: [20260 -> buf([], []), 22548 -> buf([], []), 26244 -> buf([], [])] 13 seconds: [20260 -> buf([], []), 22548 -> buf([], []), 26244 -> buf([], [])] 14 seconds: [22548 -> buf([], [])] 15 seconds: [22548 -> buf([], [])] 16 seconds: [22548 -> buf([], [])] 17 seconds: [22548 -> buf([], [])] 18 seconds: [22548 -> buf([], [])] 19 seconds: [22548 -> buf([], [])] 20 seconds: [22548 -> buf([], [])] 21 seconds: [22548 -> buf([], [])] 22 seconds: [22548 -> buf([], [])] 23 seconds: [22548 -> buf([], [])] 24 seconds: [] 25 seconds: [] 26 seconds: []    ...
Regards Martin
User avatar
Thomas Linder Puls
VIP Member
Posts: 1466
Joined: 28 Feb 2000 0:01

Re: Thread terminates listener

Post by Thomas Linder Puls »

Excellent :-)

(Did you notice what I wrote about sleep -> yield)
Regards Thomas Linder Puls
PDC