Flow-based grid submissions, aka Streaming CE

Currently, grid CEs based around technologies like Globus GRAM, CREAM, or HTCondor-C essentially are services that submit a job into a local site scheduler.

This page contains thoughts re a "Streaming CE" (pseudonyms incl "Pilot Factory", or "flow submission"). What we mean is a service that would submit flows of jobs; in the sense of "keep X jobs running at all times".

When combined with the concept of glideins/pilot jobs, the idea of a streaming CE is to transform the job interface provided by site schedulers into a "resource provisioning" interface. If you are asking to "keep X single core jobs running at all times", this is equivalent to saying "provision me X CPUs". The fact that I may need to move from one node to another is just a nuisance.


[this Requirements section is via emails written by Igor Sfiligoi]

Server side requirements

1) Design a resource provisioning service to be deployed at Grid sites. For purely explanatory reasons, an example provisioning request would be "I need X single CPU slots with 3G of memory". It is desirable that X could be arbitrarily large, but putting a limit on X, even as low as 1, is acceptable.

2) The resource provisioning does not need to be "all or nothing". It is acceptable for the service to provide only a subset of resources at any point in time.

3) The number of resources provisioned for the requester does not need to grow linearly. It is acceptable that some provisioned resources are taken away from the requester. It is desirable that the processes running are given sufficient advance notice.

4) The service must provide functionality at least equivalent to the current Grid CEs.

Given that, here is the set of features for such a service:

a) The service must provide an API for requesting N "resources". The service must also provide an API to describe the kind of resources. For purely explanatory reasons, this would be something like the Globus RSL. It is desirable that the user can also provide how "aggressive" the provisioning should be, although it is not required, Without being prescriptive, this could be the max number of started resources per minute. As a result, it returns a handle if the request for successful, or reliably indicates that there was an error and the request was not accepted. (See (1) on the limits of N)

b) Two accepted requests for resources result in two different, independent handles that are treated independently by the service.

c) Once a request is accepted and associated with a handle, the service will try to get the user the needed number of resources. For example, if a local batch system is used for this, this translates into submitting up to N jobs into it. A resource is considered provisioned, once a job starts running.

d) If a provisioned resource is taken away from the request associated with a user's handle, either because the user-provided process terminated or because the site decided to forcibly take it away, the service will automatically try to provision a new resource. For example, if a local batch system is used, soon after a running job terminates, the service will submit a new one.

e) The user must be able to increase the number of resources requested on an existing handle. The service must explicitly accept or deny the request, although an error message is acceptable and must be interpreted as a denial. Please notice that the service is always allowed to deny the request (for whatever reason, including the limits mentioned in (1))

f) The user must be able to reduce the number of resources requested on an existing handle. Furthermore, the user must be able to specify how to pick the victims. While I do not want to be prescriptive, the obvious choices are "NotRunning", "Shortest", "Longest". It is also desirable for the user to be able to specify the way the process is notified that it has to go away. Again, without being prescriptive, an obvious choice is to specify the signal number to be sent. Furthermore, it is desirable for the user to be able to specify if the processes on the victim resources should be allowed to go away on their own pace, or it the service should hard kill after a reasonable delta. The service must explicitly acknowledge the acceptance of the command, or reliably indicate there was an error and the request was not accepted. Please notice that there should never be a good reason for the service to deny the request. Moreover, it is acceptable to invalidate the user's handle if the accepted request was for 0 resources, although the service is not required to.

g) The user must be able to request the destruction of any existing handle. The net effect is that all provisioned resources are immediately hard killed and no new resources will ever be provisioned for this handle again. For example, if a local batch system is used, all jobs associated with that request will be removed from the queue.

h) The returned handle has a lifetime, or lease to it. The user must be able to check the remaining lease time at any time. The service must provide an API to extend the lease. If the user does not renew the lease before it expires, the service will cancel the lease with the same effects as if the user explicitly asked for the destruction of the handle. It is desirable that the user can specify the desired lease time, although the service has always the final word.

i) The user must be able to ask if a handle is still valid, and if it is, how many resources have been requested, how many are currently actually provisioned and how many waiting there are. The number of waiting is the number that, from the point of view of the service, is likely to be started in the near future. As an example, in a batch system, (provisioned+waiting) cannot exceed the total number of available resources. This number must also not include "problematic" jobs (e.g. Held in HTCondor). Please notice that the number of provisioned resources can be higher than the number of requested ones, due to a recent reduction request. It is desirable, but not required, to get even more information on the provisioning activity, like number recently started, number recently terminated, number internal submission errors, etc.

j1) Given a valid handle, the user can ask the service if there are any problems with it. Without being prescriptive, example answers would be "No problems", "Something is wrong with your request, failing on job submission" and "Your jobs are terminating way too fast (30s)" It is desirable that the errors are also machine parsable.

j2) Given a valid handle, the service must provide a list of ids for all the instances of resources that have ever terminated (after being started). Without being prescriptive, a id could be ${LocalQueueJobId}.${RestartNr). It is desirable that the service provides an option for the user to explicitly delete some ids, so they are not returned in further calls, but it is not strictly required. This would also delete any information/files associated with the id (see (k))

k) For each (handle,id) pair, the user must be able to request the termination code (errno), stdout and stderr of the run. The service is allowed to deny the request if it cannot fullfil the request; e.g. the logs were deleted due to disk space limits. It is desirable that the service provides an option for the user to explicitly delete stderr and stdout of any (handle,id) pair, but it is not strictly required. Being able to retrieve any other file produced by the process during the run is desirable, but not necessary.

l) All other features users expect of current Grid CEs, such as: l1) Different users expect to run processes under different security envelops, i.e. are protected from each other, if their provisioned resources happen to be on the same physical node. l2) The user must be able to specify the payload to be run on the provisioned resources. l3) The user must be able to safely upload the payload to a storage area managed by the service. The service must guarantee (as much as possible) that a payload associated with at least a valid handle is not lost or corrupted. It is desirable that the same payload can be shared among multiple handles, but is not required.

I know it is not a short list, but I do think all of them are needed.

Client side Requirements

Above was about what the server would need to provide to satisfy my needs. The question is now: What do I need to do on the client. And in particular, a specific client: HTCondor-G. So, how could HTCondor-G support this?

Here is my proposal:

I) From the user point of view, nothing changes. User still submits a job (or a bunch of jobs using queue X) at a time to HTCondor-G, as we have always done. Of course, we use a different Grid type.

II) HTCondor-G internally clusters the jobs based on the kind of resources requested (see (a)).

III) If there is at least one uniform cluster (as in (II)) without a valid "stream handle" (see (a) + (l)), HTCondor-G will contact the Provisioning Service and obtain one. The initial request is for len(cluster) resources.

IV) HTCondor-G now continuously monitors the progress of the request (see (i)). HTCondor-G is free to label any local job as running, as long as the sum of them is the same as the number obtained from the Service, and it is deterministic. Similarly, HTCondor-G should label the right number of local jobs as "Waiting"... i.e. using an appropriate GridStatus attribute. All the other jobs should either be labeled as "Unsubmitted" or they should be held (this gets possibly a bit hairy, but see (j1))

V) If a user submits another job with the same kind of resources, HTCondor-G will increase the number requested on the existing handle (see (e)).

VI) If a user removes a job from the queue, HTCondor-G will decrease the number requested on the existing handle (see (f)). If the job was idle at the time, the victim type is "NotRunning". If it is running, I guess the right type should be set in the config, together with the aggressiveness. While it may be interesting to be user specified, it may be too complicated... but I am not ruling it out, if you guys want to do it.

VII) HTCondor-G will continuously probe the Service for newly finished "handle ids" (see (j2)). For any new such id, it must retrieve the outputs, if at all possible (see (k)). These outputs must be associated with one of the jobs. How HTCondor-G picks which jobs gets it, I don't really care... whatever works for you. We would also likely need to agree on some sort of convention on the file naming, and what to do with things like exit codes.

IX) Once the last local job associated with a valid handle has been removed, and all the output files have been retrieved, HTCondor-G destroys the handle.

PS: I have not explicitly listed the renewal of the lease, but I expect HTCondor-G to take care of that, too.

Notes re Miron and Igors Streaming CE Concerns

[from Miron's visit to San Diego March 2012]

Before going in streaming CE details, let me just point out that

  1. The vanilla universe is already doing "streaming"!
  2. Jobs can restart multiple times, due to preemption. HTCondor is just not making restarts a first-class citizen.
  3. Most of what we describe below would actually make sense in the vanilla universe as well.

Miron's main concern is how we handle edge cases; everything else is easy The two main problems we spotted:

  1. What to do if the client goes away?
    • - We of course have a lease. But one may not be enough.
    • - We want the classic lease with the semantics of "If you don't hear from me before it expires, get away from me all the resources and clean up after me"
    • - However, that lease must be long, to prevent resources to be de-provisioned for short outages at client (say, 1 or 2 days)
    • - If the client is down, however, the provisioning of new resources may not work either (i.e. jobs fail). For example, the client had a Web server that is now down, and that is used during job startup (glideinWMS case).
    • - So we want at least another lease, with the semantics "If you don't hear from me before it expires, do not provision new resources (but do keep the existing ones)" This one can be reasonably short lived (tens of minutes-hours).
    • - Anything else?
    • - PS: Client should provide the desired lease time(s), but server must be able to shorten them, if so desired (-> notify client)
  2. What to do if all/most jobs finish at the same time and all restarts fail?
    • - Unmanaged, this would put a high load on site batch system! - Please notice this is currently not a problem for glideinWMS factory, because of the "slowness" of the CE
    • - Putting limits on restarts of a single job may not be enough.
    • - One job out of 1k restarting once every 10 mins is OK (e.g. broken WN).
    • - 1k jobs all restarting every 10 mins is not OK (over 1Hz)
    • - We need to correlate the various requests as much as possible, and have aggregate limits
    • - This goes against the idea of "one HTCondor-G job per request"
    • - I have no obvious solution right now

Then there are "easier" problems:

  1. Do we really want to "fly blind" and treat any "resource provisioning job" termination as equal?
    • - We should probably natively distinguish at least
      • + Failed at initialization (/validation/...)
      • + Failed, but after doing some useful work
      • + Was never claimed
      • + Right now no work for me (but did some useful work before)
      • + No problems (just being nice to the site so it can handle fair share)
      • + Anything else (what just happened here????)
    • - Would need to standardize how we convey this
      • + Is exit code the right thing?
      • + Just first approximation (e.g. No problem/problem)? And we have a different mechanism for details?
      • + Should we go even more fancy, and have actual complex policies? (I am not advocating it for "HTCondor-G streamin", but it came up in the passing and may make sense in the generic vanilla universe)
    • - Once we know the above, we should probably throttle restarts of anything but "No problems"
  2. Restart limits (related to above)
    • - Even for "No problems", we want the client to provide a max restart rate (e.g. no more than 1 per hour)
      • + To deal with bugs in the pilot code itself
    • - But we want the client to provide the limits for the other use cases as well
      • + Should we have one for each of the above categories?
      • + Or just one "problem limit"?
      • + What is a reasonable limit? e.g. No more than 1 every 20 mins? (this is fine for "broken node" handling, but maybe not for e.g. "Was never claimed")
    • - And let's not forget the "group throttling "described above
    • - PS: Client should provide the desired limits, but server must be able to change them, if so desired (-> notify client)
  3. What about getting back the output sandboxes?
    • - Regarding reliability
      • - Igor has always insisted on getting all the output sandboxes back, because of the valuable monitoring information in them. After some grilling by Miron, Igor had to admit this is not completely true; what Igor really want to get "most of them" back. I.e. he can live with a small loss of output sandboxes (say 1%), as long as it is truly random, but wants to avoid losing most of the outputs for certain events (of my jobs), even if they are relatively rare.
    • - Obvious problem is, what to do when the server gets out of space (or quota)
      • - Should it stop starting new jobs, or should it throw away the oldest sandboxes?
      • - The client should have a say about this.
    • - However, it may get more interesting than this
      • - Do we want to prioritize deletion based on the "termination mode"?
      • - e.g. Maybe we are willing to loose a fraction of "Was never claimed" sandboxes, but really want all "No problem" ones back. Or maybe our tolerance for the fraction we can loose of the different kind is different.
    • - BTW: The client may have a similar problem on its own end, but I am not sure this is relevant in this context.

Finally, Miron wanted to see all the above discussed/digested in the "client land", i.e. how we express all of the above, before even attempting to go into how we express this in a RPC-like protocol.

A Streaming CE architecture based on HTCondor-C, JobRouter

Ideas from Brian: https://docs.google.com/document/d/16HVDBLjAF5li42kue2us1SDvU1fNZfQicmgISn7VMls/edit