Login| Sign Up| Help| Contact|

Patent Searching and Data


Title:
SYSTEM AND METHOD FOR SYNCHRONIZING DISTRIBUTED COMPUTING RUNTIMES
Document Type and Number:
WIPO Patent Application WO/2016/186531
Kind Code:
A1
Abstract:
The invention provides a computing system (100), comprising a plurality of cluster computation nodes (101, 102, 103), a hybrid parallel computation runtime (104) comprising at least two distributed computing runtimes (105, 106) configured for parallel computations on the plurality of cluster computation nodes (101, 102, 103), wherein the hybrid parallel computation runtime (104) is configured to operate on data distributed to the plurality of cluster computation nodes (101, 102, 103) and comprises a synchronization module (107) configured to synchronize the at least two distributed computing runtimes (105, 106) and to provide data from at least one of the at least two distributed computing runtimes (105, 106) to at least another one of the at least two distributed computing runtimes.

Inventors:
BUSHEV DMITRY VYACHESLAVOVICH (CN)
SLESARENKO ALEXANDER VLADIMIROVICH (CN)
FILIPPOV ALEXANDER NIKOLAEVICH (CN)
Application Number:
PCT/RU2015/000315
Publication Date:
November 24, 2016
Filing Date:
May 19, 2015
Export Citation:
Click for automatic bibliography generation   Help
Assignee:
HUAWEI TECH CO LTD (CN)
International Classes:
G06F9/52
Other References:
BENJAMIN HINDMAN ET AL: "Mesos: A Platform for Fine-Grained Resource Sharing in the Data Center", USENIX,, 7 March 2011 (2011-03-07), pages 1 - 14, XP061010812
LIN XIUQIN ET AL: "Log analysis in cloud computing environment with Hadoop and Spark", 2013 5TH IEEE INTERNATIONAL CONFERENCE ON BROADBAND NETWORK & MULTIMEDIA TECHNOLOGY, IEEE, 17 November 2013 (2013-11-17), pages 273 - 276, XP032599166, DOI: 10.1109/ICBNMT.2013.6823956
BEAZLEY D M ET AL: "Perl Extension Building with SWIG", INTERNET CITATION, 20 August 1998 (1998-08-20), XP002292951, Retrieved from the Internet [retrieved on 20040818]
MATEI ZAHARIA ET AL: "Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing", USENIX,, 11 April 2013 (2013-04-11), pages 1 - 14, XP061014277
Attorney, Agent or Firm:
MITS, Alexander Vladimirovich (ul. Вolshaya Spasskaya 25/, Moscow 0, RU)
Download PDF:
Claims:
CLAIMS

1. A computing system (100), comprising

a plurality of cluster computation nodes (101, 102, 103) ,

a hybrid parallel computation runtime (104) comprising at least two distributed computing runtimes (105, 106) configured for parallel computations on the plurality of cluster computation nodes (101, 102, 103),

- wherein the hybrid parallel computation runtime (104) is configured to operate on data distributed to the plurality of cluster computation nodes (101, 102, 103) and comprises a synchronization module (107) configured to synchronize the at least two distributed computing runtimes (105, 106) and to provide data from at least one of the at least two distributed computing runtimes (105, 106) to at least another one of the at least two distributed computing runtimes (105, 106). 2. The system according to claim 1, wherein the synchronization module (107) is configured to synchronize states of computations of the at least two distributed computing runtimes (105, 106). 3. The system according to claim 1 or 2 , wherein the synchronization module (107) is configured to synchronize the at least two distributed computing runtimes (105, 106) at specific computation states of computations performed by at least one of the at least two distributed computing runtimes (105, 106) .

4. The system according to any one of the preceding claims, wherein the synchronization module (107) is configured to provide directed synchronization, wherein at least a first one of the at least two distributed computing runtimes (105, 106) is adapted to initiate computation in at least a second one of the at least two distributed computing runtimes (105, 106) .

5. The system according to claim 1, wherein the synchronization module (107) is configured to provide state variables or values of the state variables of at least the second one of the at least two distributed computing runtimes (105, 106) to at least the first one of the at least two distributed computing runtimes (105, 106), and wherein at least the first one of the at least two distributed computing runtimes (105, 106) is adapted to use the state variables or values of the state variables of at least the second one of the at least two distributed computing runtimes (105, 106) in its computation.

6. The system according to claim 5, wherein the synchronization module (107) is configured to provide state variables and values of the state variables, and wherein at least the first one of the at least two distributed computing runtimes (105, 106) is adapted to use the state variables and values of the state variables of at least the second one of the at least two distributed computing runtimes (105, 106) in its computation.

7. The system according to any one of the preceding claims, wherein the synchronization module (107) contains information about the state of computation of at least one of the at least two distributed computing runtimes (105, 106) , and wherein the synchronization module (107) is configured to provide data required to start computation of another distributed computing runtime (105, 106) when at least one of the at least two distributed computing runtimes (105, 106) reaches a specific state or indicates that the other distributed computing runtime (105, 106) should be started.

8. The system according to any one of the preceding claims, wherein the synchronization module (107) is configured to synchronize states of computation of the at least two distributed computing runtimes (105, 106) by sending specific commands to at least one of the at least two distributed computing runtimes (105, 106), and wherein a specific command is configured to transition a state of computation of the distributed computing runtime (105, 106) the specific command is applied to.

9. The system according to claim 8, wherein the specific command is configured to move the computation of the distributed computing runtime (105, 106) the specific command is applied to the next state partially until all computations of data on the cluster computation nodes (101, 102, 103) reach the next state.

10. The system according to claim 8 or 9, wherein the specific command is configured to move the computation of the distributed computing runtime (105, 106) the specific command is applied to the next state in steps.

11. The system of claim 8, wherein the specific command comprises transition information indicating which portion of data should be transitioned to the next state.

12. The system according to any one of the preceding claims, wherein the synchronization module (107) is configured to coordinate synchronization with at least one master module and at least one agent module. 13. The system according to claim 12, wherein the master module is configured to start a distributed computing runtime and to initiate computation.

14. The system according to claim 12 or 13, wherein the at least one agent module is configured to control computation of a task of one of the at least two distributed computing runtimes (105, 106) and to synchronize an execution of the task with an execution of at least one other task. 15. The system according to any one of the preceding claims, wherein the system comprises a storage module configured to exchange data between the at least two distributed computing runtimes (105, 106). 16. The system according to claim 15, wherein at least one task of one distributed computing runtime (105, 106) is adapted to use the storage module and to save input data required for a computation of another distributed computing runtime (105, 106) to the storage module.

17. The system according to claim 15 or 16, wherein at least one task of one distributed computing runtime (105, 106) is adapted to use the storage module and to obtain results of a computation of another distributed computing runtime (105, 106) from the storage module.

18. The system according to claim 16 or 17, wherein the task implements a predefined communication interface configured for communication with the storage module, and wherein the task is configured to use the communication interface to obtain input data from the storage module or to write a result of a computation to the storage module.

19. A method for operating a computing system (100), in particular a distributed cluster based parallel computation system, comprising a plurality of cluster computation nodes (101, 102, 103), and a hybrid parallel computation runtime (104) comprising at least two distributed computing runtimes (105, 106) configured for parallel computations on the plurality of cluster computation nodes (101, 102, 103), wherein the hybrid parallel computation runtime (104) operates on data distributed to the plurality of cluster computation nodes (101, 102, 103) and comprises a synchronization module (107) synchronizing the at least two distributed computing runtimes (105, 106) and providing data from at least one of the at least two distributed computing runtimes (105, 106) to at least another one of the at least two distributed computing runtimes (105, 106) .

Description:
SYSTEM AND METHOD FOR SYNCHRONIZING DISTRIBUTED COMPUTING

RUNTIMES

FIELD OF THE INVENTION

The present invention relates to the field of distributed parallel computing, especially to a system and method for synchronizing computational runtimes, i.e. optimizing synchronization of computing runtimes on a plurality of cluster computation nodes.

Particularly, the invention relates to optimizing the synchronizing of states of computations of at least two distributed computing runtimes. The optimization is preferably performed in a distributed cluster based parallel computation system.

BACKGROUND Running parallel or distributed computations on a computer cluster, which typically refers to a set of connected computers, e.g. in data centers, is widely adopted. There is a variety of runtimes available for executing parallel computations on a computer cluster. Examples for runtimes are MPI, Spark and Hadoop . They are often based on different languages (MPI: C/C++, Spark: Scala, Hadoop: Java)

Message Passing Interface (MPI) is a standardized message- passing system that defines the syntax and semantics of a core of library routines that provide essential synchronization and communication functionality between a set of processes in a language -independent way. More specific, it is a language-independent communications protocol used to program parallel programs running on a distributed system. Further advantages of MPI are that it provides a low- level standard interface and is directly supported by networking hardware, thereby enabling computation and communication with low overhead and giving opportunities for optimizations. Disadvantages are high costs (in terms of time and resources) to implement an algorithm. MPI is therefore good for system programming. HADOOP is a framework for implementing scalable, distributed software. It is based on the MapReduce algorithm, a programming model for concurrent calculations of large amounts of data on computer clusters, as well as on the Hadoop Distributed File System (HDFS) , a highly available file system for storing large amounts of data on the file systems of multiple computers. Hadoop allows performing intensive computation processes with large amounts of data on computer clusters.

Spark is another framework for implementing scalable, distributed software. Compared to Hadoop it has advantages considering ease of use and speed of computations. It eliminates deficiencies of Hadoop, which is not very efficient in certain use cases. Spark runs on top of existing HDFS infrastructure to provide enhanced and additional functionality. It is an alternative to Hadoop MapReduce and intended to provide a comprehensive and unified solution to manage different use cases and requirements .

The advantages of Hadoop or Spark may therefore be regarded as how to provide a high-level API for cluster infrastructure, in particular offering simplified resource and failover management. They provide a domain specific language (DSL) making it easy to write programs. Disadvantages on the other hand are, that efficiency of both runtimes can depend on the kinds of algorithms implemented and optimization is difficult to implement. Hadoop and Spark are therefore good for application programming .

Runtimes in general provide an abstraction layer for executing a computer program and, as used herein, for performing computing operations or calculation tasks on a number of nodes of the computing cluster.

For instance, runtimes can decouple a computation task from direct interaction with the computing hardware, thus providing more abstract and unified access to the underlying hardware. Most of the available runtimes are designed differently and hence are more or less suited for dealing with a specific calculative problem. Therefore, depending on the computation task at hand, a runtime should be chosen, which achieves the best results for the task. However, as each runtime has advantages and drawbacks, users have to deal with tradeoffs when choosing a runtime.

In most cases, computing clusters have different runtimes installed, enabling a user to run one or more calculation tasks using the available runtimes. That means if runtimes are used to execute the calculation task for a user, it would be convenient to use a runtime supporting a low- level implementation of a particular algorithm to increase performance, while using a second runtime to be able to write the main program logic using high-level architectures to facilitate the implementation process. As a consequence, involved runtimes need to interact and to establish inter-runtime communication to exchange information. As runtimes can be regarded as isolated environments providing unified and abstract means of access to the underlying hardware, but thereby also encapsulating programs executed in them, effective data sharing and interoperation among different runtimes is a barrier. Multiple runtimes can be running on the same computing cluster in parallel thereby sharing or competing for system resources.

The need for inter-runtime communication to exchange information is not new. However, with the exception of several workarounds which all involve drawbacks, no general solution in the prior art exists. To be able to establish communication between calculation tasks running in different runtimes the prior art shows the following techniques : A first idea is an automatic reimplementation of code, which means to automatically rewrite a program b, initially written using Runtime B primitives, from scratch as program b' in terms of Runtime A and run two programs a (written for Runtime A) and b' using the isolated environment of Runtime A, thereby enabling direct communication of the programs .

However, a transition from program b of Runtime B to program b' of Runtime A raises the problem, that performance benefits of program b, based on some key features of Runtime B, may be lost. After a transition from Runtime B to Runtime A, the destination Runtime A may lack crucial features essential to the execution of the program b, thereby rendering automatic reimplementation being impossible .

In the field of parallel programs and parallel systems general methods of synchronization are well known, for example software design patterns (SDP) like locks, and semaphores, that describe low- level methods of interoperation between processes on single node, or higher order parallel abstractions like futures, callbacks, actors and software transactional memory (STM) . Higher order parallel abstractions are based on low- level primitives and add levels of abstraction to the process. Other well known methods of synchronization are enterprise integration patterns like publisher, subscriber, router and channel, that describe communication between different systems. However, these methods are too general and cannot be applied in inter-runtime communication.

Another workaround known from the prior art is communication through external memory. This solution requires that programs or calculation tasks running in different runtimes are adapted to read and write to shared external memory. This approach has the drawback that it is not general and requires programs and algorithms to be adapted to a mutual way of exchanging information using the shared memory. The use of external memory also adds input- output overhead, thereby decreasing performance of computation. Therefore the problem arises, how to optimize inter-runtime communication to exchange information and to effectively map computation logic of a program running on a first runtime into a program running on a second runtime. The invention hence provides a solution to efficiently synchronize runtimes and allow inter-runtime communication in parallel distributed systems, which overcomes the problems of the prior art mentioned above, by providing a computing system and method for operating a computing system according to the independent claims. Further aspects and embodiments are subject to the dependent claims.

SUMMARY

According to a first aspect the invention provides a computing system, comprising a plurality of cluster computation nodes, a hybrid parallel computation runtime comprising at least two distributed computing runtimes configured for parallel computations on the plurality of cluster computation nodes, wherein the hybrid parallel computation runtime is configured to operate on data distributed to the plurality of cluster computation nodes and comprises a synchronization module configured to synchronize the at least two distributed computing runtimes and to provide data from at least one of the at least two distributed computing runtimes to at least another one of the at least two distributed computing runtimes. Such a hybrid parallel computation runtime allows combining advantages of its constituent distributed computing runtimes. Thus, a user implementing one or more programs is not restricted to the domain of a single runtime but can use the benefits and efficiency of specific runtimes, by choosing the runtimes best suited for the problems at hand.

According to a first implementation form of the first aspect, the synchronization module can be configured to synchronize states of computations of the at least two distributed computing runtimes.

This allows to efficiently distribute a program over several runtimes. In this way a specific part of a computation can be carried out on a second distributed computing runtime, which is more efficient for the specific part of the computation than the first computing runtime, before synchronizing it back to the first computing runtime.

According to a second implementation form of the first aspect, the synchronization module can be configured to synchronize the at least two distributed computing runtimes at specific computation states of computations performed by at least one of the at least two distributed computing runtimes .

Hence, it is possible to make the execution of a synchronization process dependent from events and/or states. This ensures that synchronization is performed at points in the processing intended by the programmer, or, e.g., when it is most efficient for the current computation task at hand. Thus the overall computation process is considerably accelerated.

According to a third implementation form of the first aspect, the synchronization module may be a state machine. According to a fourth implementation form of the first aspect, the synchronization module can be configured to provide directed synchronization, wherein at least a first one of the at least two distributed computing runtimes can be adapted to initiate computation in at least a second one of the at least two distributed computing runtimes.

By providing a direct synchronization technique, an efficient sequential operation mode is provided.

According to a fifth implementation form of the first aspect, the synchronization module can be configured to provide state variables or values of the state variables of at least the second one of the at least two distributed computing runtimes to at least the first one of the at least two distributed computing runtimes. At least the first one of the at least two distributed computing runtimes can be adapted to use the state variables or values of the state variables of at least the second one of the at least two distributed computing runtimes in its computation.

This provide a unified way of sharing the program computation state between computing runtime. Thus, the overall computation process is considerably accelerated.

According to a sixth implementation form of the first aspect, the synchronization module can be configured to provide state variables and values of the state variables. At least the first one of the at least two distributed computing runtimes can be adapted to use the state variables and values of the state variables of at least the second one of the at least two distributed computing runtimes in its computation.

According to a seventh implementation form of the first aspect, the synchronization module can contain information about the state of computation of at least one of the at least two distributed computing runtimes. The synchronization module can be configured to provide data required to start computation of another distributed computing runtime when at least one of the at least two distributed computing runtimes reaches a specific state or indicates that the other distributed computing runtime should be started. This allows an efficient switch between computation runtimes, improving thus the overall performance of the computation process.

According to an eight implementation form of the first aspect, the computations can be adapted to run in parallel and distributed across the cluster computation nodes.

The resulting advantages are less overall computation time due to a more effective way of sequential processing of tasks, better use of resources and scalability.

According to a ninth implementation form of the first aspect, the synchronization module can be configured to synchronize states of computation of the at least two distributed computing runtimes by sending specific commands to at least one of the at least two distributed computing runtimes. A specific command can be configured to transition a state of computation of the distributed computing runtime the specific command is applied to.

According to a tenth implementation form of the first aspect, the specific command can be configured to move the computation of the distributed computing runtime the specific command is applied to the next state partially until all computations of data on the cluster computation nodes reach the next state. According to an eleventh implementation form of the first aspect, the specific command can be configured to move the computation of the distributed computing runtime the specific command is applied to the next state in steps. This provides for advanced synchronization and monitoring capabilities as well as a refined scheduling. This ensures that in concurrent tasks computations successfully reach predefined states and that the next steps are preformed, depending on previous states. Also, debugging is supported.

According to a twelfth implementation form of the first aspect, the specific command can comprise transition information indicating which portion of data should be transitioned to the next state.

According to a thirteenth implementation form of the first aspect, a lifetime of the synchronization module can be bound to a lifetime of a computation of at least one of the at least two distributed computing runtimes.

This results in an improved management of resources, since resources allocated by the synchronization module can be freed once they are not needed. According to a fourteenth implementation form of the first aspect, the synchronization module can be configured to coordinate synchronization with at least one master module and at least one agent module . According to a fifteenth implementation form of the first aspect, the master module can be configured to start a distributed computing runtime and to initiate computation. According to a sixteenth implementation form of the first aspect, the at least one agent module can be configured to control computation of a task of one of the at least two distributed computing runtimes and to synchronize an execution of the task with an execution of at least one other task.

Using separate modules has the advantage of providing a solution that easily scales with growing complexity of computation tasks and the need for resources. Separating the provided functionality to several modules not only allows a unified way of addressing different computations and tasks, but allows making it easier for programmers to understand the handling of software. Hierarchical relationships among the different modules allow and provide a central management and therefore a more effective way of sequential processing of tasks.

According to a seventeenth implementation form of the first aspect, the system can comprise a storage module configured to exchange data between the at least two distributed computing runtimes.

According to an eighteenth implementation form of the first aspect, at least one task of one distributed computing runtime can be adapted to use the storage module and to save input data required for a computation of another distributed computing runtime to the storage module. According to a nineteenth implementation form of the first aspect, at least one task of one distributed computing runtime can be adapted to use the storage module and to obtain results of a computation of another distributed computing runtime from the storage module.

According to a twentieth implementation form of the first aspect, the task can implement a predefined communication interface configured for communication with the storage module. The task can be configured to use the communication interface to obtain input data from the storage module or to write a result of a computation to the storage module.

According to a twenty- first implementation form of the first aspect, one of the at least two distributed computing runtimes can implement a low- level message passing interface describing patterns of inter-process communication. Another one of the at least two distributed computing runtimes can be configured to operate with at least one system process, one of which being a main process which manages others, and at least one worker process.

According to a twenty-second implementation form of the first aspect, one of the at least two distributed computing runtimes can be configured to operate with a driver process and at least one worker process and can be configured to operate with Resilient Distributed Datasets.

According to a twenty-third implementation form of the first aspect, one of the at least two distributed computing runtimes can be a Spark runtime and another one of the at least two distributed computing runtimes can be a MPI runtime . According to a twenty- fourth implementation form of the first aspect, the system can be a distributed cluster based parallel computation system. According to a second aspect the invention provides a method for operating a computing system, in particular a distributed cluster based parallel computation system, comprising a plurality of cluster computation nodes, and a hybrid parallel computation runtime comprising at least two distributed computing runtimes configured for parallel computations on the plurality of cluster computation nodes, wherein the hybrid parallel computation runtime operates on data distributed to the plurality of cluster computation nodes and comprises a synchronization module synchronizing the at least two distributed computing runtimes and providing data from at least one of the at least two distributed computing runtimes to at least another one of the at least two distributed computing runtimes. According to a first implementation form of the second aspect, the synchronization module can synchronize states of computations of the at least two distributed computing runtimes . According to a second implementation form of the second aspect, the synchronization module can synchronize the at least two distributed computing runtimes at specific computation states of computations performed by at least one of the at least two distributed computing runtimes.

According to a third implementation form of the second aspect, the synchronization module can be a state machine. According to a fourth implementation form of the second aspect, the synchronization module can provide directed synchronization, wherein at least a first one of the at least two distributed computing runtimes can initiate computation in at least a second one of the at least two distributed computing runtimes.

According to a fifth implementation form of the second aspect, the synchronization module can provide states variables or values of the state variables of at least the second one of the at least two distributed computing runtimes to at least the first one of the at least two distributed computing runtimes. At least the first one of the at least two distributed computing runtimes can use the state variables or values of the state variables of at least the second one of the at least two distributed computing runtimes in its computation.

According to a sixth implementation form of the second aspect, the synchronization module can provide the state variables and the values of the state variables. At least the first one of the at least two distributed computing runtimes can use the state variables and values of the state variables.

According to a seventh implementation form of the second aspect, the synchronization module can contain information about the state of computation of at least one of the at least two distributed computing runtimes. The synchronization module can provide data required to start computation of another distributed computing runtime when at least one of the at least two distributed computing runtimes reaches a specific state or indicates that the other distributed computing runtime should be started.

According to an eighth implementation form of the second aspect, the computations can run in parallel and distributed across the cluster computation nodes.

According to a ninth implementation form of the second aspect, the synchronization module can synchronize states of computation of the at least two distributed computing runtimes by sending specific commands to at least one of the at least two distributed computing runtimes. A specific command can transition a state of computation of the distributed computing runtime the specific command is applied to.

According to a tenth implementation form of the second aspect, the specific command can move the computation of the distributed computing runtime the specific command is applied to the next state partially until all computations of data on the cluster computation nodes reach the next state .

According to an eleventh implementation form of the second aspect, the specific command can move the computation of the distributed computing runtime the specific command is applied to the next state in steps.

According to a twelfth implementation form of the second aspect, the specific command can comprise transition information indicating which portion of data should be transitioned to the next state.

According to a thirteenth implementation form of the second aspect, a lifetime of the synchronization module can be bound to a lifetime of a computation of at least one of the at least two distributed computing runtimes. According to a fourteenth implementation form of the second aspect, the synchronization module can coordinate synchronization with at least one master module and at least one agent module. According to a fifteenth implementation form of the second aspect, the master module can start a distributed computing runtime and can initiate computation.

According to a sixteenth implementation form of the second aspect, the at least one agent module can control computation of a task of one of the at least two distributed computing runtimes and can synchronize an execution of the task with an execution of at least one other task.

According to a seventeenth implementation form of the second aspect, the system can comprise a storage module configured to exchange data between the at least two distributed computing runtimes.

According to a sixteenth implementation form of the second aspect, at least one task of one distributed computing runtime can use the storage module to save input data required for a computation of another distributed computing runtime to the storage module.

According to a seventeenth implementation form of the second aspect, at least one task of one distributed computing runtime can use the storage module to obtain results of a computation of another distributed computing runtime from the storage module. According to an eighteenth implementation form of the second aspect, the task can implement a predefined communication interface configured for communication with the storage module, and wherein the task uses the communication interface to obtain input data from the storage module or write a result of a computation to the storage module.

According to a nineteenth implementation form of the second aspect, one of the at least two distributed computing runtimes can implement a low- level message passing interface describing patterns of inter-process communication. Another one of the at least two distributed computing runtimes can operate with at least one system process, one of which being a main process which manages others, and at least one worker process.

According to a twentieth implementation form of the second aspect, one of the at least two distributed computing runtimes can operate with a driver process and at least one worker process and can be configured to operate with

Resilient Distributed Datasets.

According to a twenty- first implementation form of the second aspect, one of the at least two distributed computing runtimes can be a Spark runtime and another one of the at least two distributed computing runtimes can be a MPI runtime. According to a twenty- second implementation form of the second aspect, the method can be a method for operating a distributed cluster based parallel computation system.

BRIEF DESCRIPTION OF THE DRAWINGS

The above described aspects and embodiments of the present invention will now be explained in the following also with reference to the figures.

Fig. 1 shows a schematic overview of the invention. Fig. 2 illustrates a starting point of the invention. Fig. 3 shows a schematic overview of synchronization of two distributed computing runtimes according to the invention.

Fig. 4 shows a schematic overview of data distributed across cluster computation nodes.

Fig. 5 shows a schematic overview of a synchronization process . Fig. 6 shows an ER diagram of a system setup.

Fig. 7 shows a schematic overview of a relation

according to the ER diagram in Fig. 4. Fig. 8 shows a schematic overview of a composition

according to the ER diagram in Fig. 4.

Fig. 9 shows a schematic overview of a synchronization method.

Fig. 10 shows a schematic overview of a resilient distributed dataset (RDD) .

Fig. 11 shows a schematic overview of an interface.

DETAILLED DESCRIPTION OF THE EMBODIMENTS Generally, it has to be noted that all arrangement, devices, modules, components, models, elements, units and means and so forth described in the present application could be implemented by software or hardware elements or any kind of combination thereof. All steps which are performed by the various entities described in the present application as well as the functionality described to be performed the various entities are intended to mean that the respective entity is adapted to or configured to perform the respective steps and functionalities. Even if in the following description of the specific embodiments, a specific functionality or step to be performed by a general entity is not reflected in the description of a specific detailed element of the entity which performs the specific step or functionality, it should be clear for a skilled person that these methods and functionalities can be implemented in respective hardware or software elements, or any kind of combination thereof. Further, the method of the present invention and its various steps are embodied in the functionalities of the various described apparatus elements.

The invention allows optimized synchronizing of states of computations of at least two distributed computing runtimes 105, 106. The optimization is preferably performed in a distributed and/or cluster based parallel computation system . As already mentioned, in the past it was necessary to reimplement programs running in different runtimes. It was also necessary to use established methods for synchronization such as shared memory to provide inter- runtime communication, which could not be applied in all kinds of scenarios.

The invention solves these problems by creating a hybrid parallel computation runtime 104 that comprises at least two, in particular several, e.g. heterogeneous, distributed computing runtimes 105, 106 that are interconnected by a synchronization module 107, which facilitates communication between the distributed computing runtimes 105, 106.

The invention provides a system and method for synchronization of programs running essentially in parallel on different distributed computing runtimes 105, 106. In particular, the synchronization includes control of execution of computations on involved distributed computing runtimes 105, 106 and exchange of data between distributed computing runtimes 105, 106. A Hybrid parallel computation runtime 104 in this way, allows combining advantages of its constituent distributed computing runtimes 105, 106.

Thus, a user implementing one or more programs is not restricted to the domain of a single runtime but can use the benefits and efficiency of specific runtimes, by choosing the runtimes best suited for the problems at hand. Fig. 1 shows a general setup according to the first aspect of the invention. In Fig. 1 a computing system 100 is shown. The computing system 100 comprises a plurality of cluster computation nodes 101, 102, 103 and a hybrid parallel computing runtime 104. The hybrid parallel computing runtime 104 comprises at least two distributed computing runtimes 105, 106 and a synchronization module 107. In Fig. 1 two distributed computing runtimes 105 and 106 are shown. However, the hybrid parallel computing runtime 104 can also include more distributed computing runtimes.

Moreover, in Fig. 1 exemplarily three cluster computation nodes 101, 102, 103 are shown. However, the computing system 100 can also include more or less cluster computation nodes 101, 102, 103 that can be interconnected. Arrows in Fig. 1 connecting hybrid parallel computation runtime 104 with the cluster computation nodes 101, 102, 103 illustrate that the hybrid parallel computation runtime 104 can be applied and can execute computation distributed across at least some of the cluster computation nodes 101, 102, 103. The computation can be done in parallel on the cluster computation nodes.

A cluster computation node 101, 102, 103 can be a single computing element, such as a CPU (central processing unit) , GPU (graphics processing unit) with associated units such as storage, memory, I/O and/or network units, controllers, etc., of a computation cluster. In a computation cluster, multiple computers are connected and work together, so that they can be regarded as a single system. The discrete computers are usually connected via a computer network, for example a local area network (LAN), e.g. by Myrinet, (Gigabit) Ethernet, TH Express or Infiniband. The nodes may use different operating systems and different hardware, typically use a common operating software and a hardware architecture. A computation cluster is typically used to improve performance over that of a single computer.

A distributed computing runtime 105, 106 provides an abstraction layer for executing a computer program and for performing computing operations or calculation tasks on a number of cluster computation nodes 101, 102, 103, providing more abstract and unified access to the underlying hardware of the computation cluster. The distributed computing runtime 105, 106 is not limited to operate exclusively on a single cluster computation node.

Multiple distributed computing runtimes can exist and work in parallel on even the same the cluster computation nodes 101, 102, 103. Moreover, it is also possible that a distributed computing runtime 105, 106 is executed on cluster computation nodes of more than one computation clusters. However, for the sake of simplicity, in the following only one computation cluster is described.

In a hybrid parallel computation runtime 104, the distributed computing runtimes 105, 106 can either access cluster computation nodes 101, 102, 103 directly by addressing a single cluster computation node in order to execute a computation or access the whole set of cluster computation nodes 101, 102, 103 by addressing all cluster computation nodes 101, 102, 103 as single abstract system.

The single abstract system can be provided by resource management logic in the distributed computing runtimes 105, 106 or the hybrid parallel computation runtime 104. Distributed computing runtimes 105, 106 can operate on data distributed across the cluster computation nodes 101, 102, 103.

Fig. 2 illustrates a communication of program A of runtime A with program B of runtime B. Fig. 3 shows in an overview the synchronization of program A of distributed computing runtime A with program B of distributed computing runtime B according to the invention. In particular, a hybrid parallel computing runtime 104 is shown with a distributed computing runtime 105 as primary runtime (Runtime A) , and a second distributed computing runtime 106 as a secondary runtime (Runtime B) and with a synchronization module 107. The synchronization module 107 provides a directed synchronization, in this case from parallel computing runtime 105 to parallel computing runtime 106 or vice versa. Directed synchronization means so that the primary runtime A initiates computation and calls secondary runtime B, or vice versa.

Fig. 4 shows an exemplary structure of data distributed across two cluster computation nodes nodeO and nodel during computation. The Data structure, as for example schematically illustrated in Fig. 4, can have the type "Dist [Array [Iterator [T] ]]" , whereby "Dist" represents a distribution to the cluster computation nodes, "Array [Iterator [T] ] " represents an array of data parts on each node, "T" represents a data type of the data part and "Iterator [T] " can represent an iteration over an array of data parts of type T.

A hybrid parallel computing runtime 104, as shown in Fig. 1, consists of at least two distributed computing runtimes 105, 106 and a synchronization module 107, which provides synchronization between the distributed computing runtimes 105, 106.

Synchronizing, in case of the invention, is to be regarded as adapting state and behavior of the distributed computing runtimes 105, 106 to each other. It can also mean to provide data from at least one distributed computing runtime 105 to at least another distributed computing runtime 106. The synchronization module 107 can also be called link or link module.

The synchronization module 107 does however not have to be provided statically. Its lifetime can be bound to a computation of at least one of the distributed computing runtimes 105, 106, thereby improving management of resources. Resources allocated by the synchronization module 107 thereby can be freed once they are not needed.

In the following sections, a distributed computing runtime 105, 106 is an environment which is capable of running parallel programs. A program can be a parallel program written in a domain of a distributed computing runtime 105, 106. A task can be a single unit of computation (e.g. in a program) . Function and behavior according to the invention of the aforementioned terms is again described later with regards to Fig. 6. As it is now explained in view of Fig. 5, the synchronization module 107 can synchronize states of computations. By synchronizing a computation state from for example distributed computing runtime A (referred to as "Runtime A" in Fig. 5) to distributed computing runtime B (referred to as "Runtime B" in Fig. 5) , a specific part of a computation can be carried out on distributed computing runtime B, which is more efficient for the specific part of the computation, before, e.g., synchronizing it back to distributed computing runtime A which is better suited for the rest of the computation process. In a scenario, where a hybrid parallel computation runtime 104 comprises two distributed computing runtimes A and B, distributed computing runtime A can be called primary runtime and distributed computing runtime B may be called secondary runtime .

The synchronization module 107 is capable of synchronizing distributed computing runtimes A, B at specific computation states of computations performed by at least one of the distributed computing runtimes A, B that takes part in a synchronizing process. Hence, it is possible to make the execution of a synchronization process dependent from events and/or states. This ensures that synchronization is performed at points in the processing intended by the programmer, or, e.g., when it is most efficient for the current computation task at hand. Consequently, after synchronization, the computation can be performed on a better suited distributed computing runtime B, allowing the efficient calculation of the current or next calculation step. Afterwards, a further synchronization back to distributed computing runtime A can be carried out, again depending on events and states of the current computation on distributed computing runtime B. Thus the overall computation process is considerably accelerated.

The synchronization module 107 can be represented as a state machine which synchronizes states of computations of each distributed computing runtime A, B. In a stepwise computation, a distributed computing runtime A, B transforms data by applying at least a function to computation data, thereby changing the state of computation .

The synchronization module 107 can further provide directed synchronization, wherein at least a first one of the at least two distributed computing runtimes A, B is configured to initiate computation in at least a second one of the at least two distributed computing runtimes A, B. An example is schematically shown in Fig. 5. For example, in a directed synchronization process from distributed computing runtime A to distributed computing runtime B, distributed computing runtime A initiates computation on distributed computing runtime B. The directed synchronization process can also be referred to as one distributed computing runtime "calling" the other one, defining that program a (referred to as "Program A" in Fig. 5) of distributed computing runtime A invokes program b (referred to as "Program B" in Fig. 5) of distributed computing runtime B. By providing synchronization technique, an efficient sequential operation mode is provided.

According to the prior art it was necessary that for example distributed computing runtime B was periodically reading a storage medium and checking whether data for further computation in distributed computing runtime B was stored on the storage medium by another distributed computing runtime (e.g. distributed computing runtime A). This is also known as exchanging data by shared memory or storage. By implementing directed synchronization, overhead due to periodically reading and processing stored information and response time until the beginning of computation on the secondary distributed computing runtime can be reduced, and thus overall computation speed can be improved .

The synchronization module 107 can provide state variables and/or values of the state variables of at least the first or second one of the at least two distributed computing runtimes A, B to at least the second or first one of the at least two distributed computing runtimes A, B. At least the one of the at least two distributed computing runtimes A, B can use the state variables or values of the state variables of at least one other distributed computing runtimes A, B in its computation.

By providing state variables and/or values of the state variables the state of a computation in a distributed computing runtime A, B can be unambiguously described. When, e.g., synchronizing task A of distributed computing runtime A with task B of distributed computing runtime B, the initial state of task B will contain in addition to the local variables of task B also the state variables and their values of the current state of task A. Hence, the synchronization module 107 contains information about the state of computation of distributed computing runtime A, and provides all the information necessary to start computation on distributed computing runtime B. Thereby a distributed computing runtime A, B is provided with enough information to determine the future behavior of a computation on the distributed computing runtime A, B. This solution also provides a unified way of communication and exchanging data stored in state variables . The synchronization module 107 can contain information about the state of computation of at least one of the at least two distributed computing runtimes A, B and can provide data required to start computation of another distributed computing runtime when at least one of the at least two distributed computing runtimes A, B reaches a specific state or indicates that the other distributed computing runtime A, B should be started. The data required to start computation of another distributed computing runtime A, B can include existing algorithms that can be packed as a library in the synchronization module 107. The purpose of such a library is to enable reusing the same (program) logic in different distributed computing runtimes A, B. It can map algorithms of different languages used in different distributed computing runtimes A, B to the same underlying program logic. This allows e.g. the program logic implemented by means of distributed computing runtime A to automatically be reused after synchronization in distributed computing runtime B.

According to the invention, and turning back to Fig. 1, the hybrid parallel computation runtime 104 has extended functionality as it combines the features of each distributed computing runtime 105, 106 it uses. A user using the hybrid parallel computation runtime 104 thereby benefits from a more diverse set of tools to implement algorithms. A domain specific language to operate the hybrid parallel computation runtime 104 can be created that contains combined features of several distributed computing runtimes 105, 106. In addition various code generation techniques to automatically generate programs e.g. during a synchronization process can be implemented. After synchronization, tasks A and B are not necessarily computed in parallel. As can be e.g. seen in Fig. 5, if task A has performed computation steps and has reached state 2, the synchronization module 107 provides the current state variables (and/or their values and, if required, other data to start computation of another distributed computing runtime) to distributed computing runtime B in order to start task B. After performing computation steps on distributed computing runtime B and reaching a further state of computation, distributed computing runtime B can again be synchronized with distributed computing runtime A according to the same principle. A transition from one state on another is indicated as a step. Computation then is continued in distributed computing runtime A. Distributed computing runtime A however can be synchronized with several distributed computing runtimes Bl, B2, etc. whereby Bl, B2 , etc. are running preferably in parallel, e.g. during an iterative or recursive computation task, to exploit the benefits of concurrent computation.

When having finished the computation tasks, distributed computing runtimes Bl, B2 , etc. can synchronize again with distributed computing runtime A.

Again in view of Fig. 1, the computation tasks of the involved distributed computing runtimes 105, 106 are thereby adapted to run in parallel and distributed across the cluster computation nodes 101, 102, 103. The resulting advantages are less overall computation time due to a more effective way of sequential processing of tasks, better use of resources and scalability. According to Fig. 5, the synchronization module 107 is further able to synchronize the states of computation of at least two distributed computing runtimes A, B by sending specific commands to at least one distributed computing runtime A or B.

The specific command transitions a state of computation of the distributed computing runtime A or B the specific command is applied to.

A transition in this context should be understood as a change of state in a computing system. In terms of the invention it means changing data (that is stored in at least one distributed computing runtime A or B and on at least one cluster computation node 101, 102, 103) by applying computation steps (e.g. a predefined function) to the data.

The specific commands can also be called move commands. The specific commands are applied to the distributed computing runtimes A, B and transition state of computation on the distributed computing runtimes A, B.

The specific commands can be used to synchronize the states of computation of different distributed computing runtimes A, B but also to transition the state of computation on one distributed computing runtime (e.g. distributed computing runtime B) to the next step. In a system comprising two distributed computing runtimes A and B, this e.g. includes controlling the computation steps carried out on distributed computing runtime B by transitioning state of computation on distributed computing runtime B, after data is synchronized from distributed computing runtime A to distributed computing runtime B.

A specific command sent to at least one distributed computing runtime 105, 106 can also control operations on the underlying cluster computation nodes 101, 102, 103.

Computation runs in parallel and distributed across the cluster computation nodes 101, 102, 103. In this parallel processing, computation can be carried out partially, in steps, until all pieces of data reach the next state of computation .

The synchronization module 107 allows making use of and controlling distributed computing by carrying out computation tasks across the cluster computation nodes 101, 102, 103 partially, for example by dividing a computation task. The resulting sub-tasks are then distributed across the cluster computation nodes 101, 102, 103, where respective computations are performed. The synchronization module 107 can further control and synchronize the computation of tasks to be carried out in steps, thereby providing advanced synchronization and monitoring capabilities as well as a refined scheduling. This ensures that in concurrent tasks computations successfully reach predefined states and that the next steps are preformed, depending on previous states. Also, debugging is supported.

The specific command or set of commands sent by the synchronization module 107 to the distributed computing runtimes 105, 106 can contain information about the configuration or hardware setup of cluster computation nodes 101, 102, 103 that are involved in transition of states. The specific command sent by the synchronization module 107 can initialize allocation and use of cluster computation nodes 101, 102, 103 by the distributed computing runtimes 105, 106 and/or the hybrid parallel computation runtime 104. Depending on the distributed computing runtime applied, cluster computation nodes 101, 102, 103 can be directly addressed by the distributed computing runtime or cluster computation nodes 101, 102, 103 can be regarded as an abstract single system. The specific command can also comprises transition information indicating which portion of data should be transitioned to the next state. In a specific implementation manner of the invention this is done by a move command. The move command uses a parameter p as an argument that represents what part of data should be transitioned to the next state in which manner: move (p : Path) . The argument p in this example is of type "Path" . The type of p relates to the data it represents: type Path = Node#Part#Elem. "Node" in this type definition can describe the cluster computation node the computation is carried out on. "Part" can describe a partition the addressed element is stored in. This can be an array. "Element" can describe an identifier of the addressed data element in the array. The "#" indicates that "Path" can be a complex type, defined by at least the three components or parts thereof mentioned before. The expression "Node#Part#Elem" indicates that a data element "Elem" is physical located as a data partition (e.g. a portion of data that should be used in the computation) on cluster computation node "Node" . A partition "Part" can be regarded as an array containing data elements "Elem"; "Elem" e.g. contains data of type T.

The path p of the move command can affect the node, where the relevant data is stored. Cluster computation nodes 101, 102, 103 are the structure on which distributed computing runtimes 105, 106 operate. However commands can affect computation on the distributed computing runtimes 105, 106 by influencing operation of the underlying structure, i.e. the cluster computation nodes 101, 102, 103. A move command, for example, can control synchronization of one distributed computing runtime 105, 106 with another, but may also control the transition of state on one distributed computing runtime 105, 106 to the next state on the same distributed computing runtime 105, 106.

"Path" also includes the function that should be applied to the addressed elements when transitioning from a state to the next state.

In the following, examples are given that describe the effect of the move command. T and U represent abstract data types and relate to data represented in Fig. 5 as A, B, C, D or E. T represents input values of data type T of the and U represents the resulting values of data type U:

Command 1 :

move(x: Node) - x . Array [Iterator [T] ] -> x. Array [Iterator [U] ]

Command 1 exemplary shows, how data on node x is moved to the next state, by applying function f: Array [Iterator [T] ] -> Array [Iterator [U] ]

to it whereby "Iterator" can be used for iterating over an array or a collection of elements T.

Command 2 :

move(x.y: Node#Part) - x . y . Iterator [T] -> x.y . Iterator [U] Command 2 exemplary shows, how data of a partition y on node x is moved to the next state, by applying function f: Iterator [T] -> Iterator [U] to it .

Command 3 :

move (x.y. z: Node#Part#Elem) - x.y.z : T -> x.y.z : U

Command 3 exemplary shows, how one element z of a partition y on a node x is moved to the next state, by applying function f: T -> U to it, whereby "Elem" can represent a data element, e.g. also an array, and can be of data type T.

For a better understanding, the following shows how execution of commands 1 to 3 can operate on data distributed across the cluster computation nodes: For example, a distributed collection of integers [1, 2, 3, 4, 5, 6] represents data to be distributed across cluster computation Nodel and cluster computation Node2 : Nodel contains two partitions: [1, 2], [3, 4] (e.g. two arrays, each comprising two elements);

Node2 contains one partition: [5, 6] (e.g. one array comprising two elements) . The data distributed across cluster computation Nodel and cluster computation Node2 can also be expressed as:

Nodel (Part (1, 2), Part (3, 4))

and

Node2 (Part (5, 6))

"Nodel" and/or "Node2" can contain data of type Array [Iterator [T] ] , "Nodel#Part" and/or "Node2#Part" can contain data of type Iterator [T] , and "Elem" can contain data of type T (e.g. integers 1 to 6) .

Carrying on with the example above, Commands 1 to 3 applied to the distributed collection of integers can result in state transitions:

Command 1, "move(x: Node)" applied to Nodel (e.g. using Nodel as argument) , could move the following data (which is represented as an array of iterators) to the next state: [[1, 2], [3, 4]].

Command 2, "move(x.y: Node#Part) " applied to Part2 of Nodel (e.g. using Nodel. Part2 as argument), could move [3, 4] (e.g. a collection represented as an iterator) to the next state.

Command 3, "move(x.y.z: Node#Part#Elem) " applied to Node 1, Part 2 and Element 2 (e.g. using Nodel . Part2.2 as argument) , could move element "4" to the next state .

Fig. 6 exemplarily depicts, in an entity-relationship like model, two distributed computing runtimes (distributed computing runtime A, which is referred to as "Runtime A" in Fig. 6 and distributed computing runtime B, which is referred to as "Runtime B" in Fig. 6) and further parts, such as the synchronization module 107, which provides synchronization between the distributed computing runtimes A and B.

The relations between "entities" in the model of Fig. 6, are illustrated in two ways as shown in Fig. 7 and Fig. 8. An association (as shown in Fig. 7) describes a simple relation between an entity A and an entity B. A composition (as illustrated in Fig. 8) implies, that the lifetime of an associated entity B is depending on the lifetime of a source entity A. The numbers at the ends of the lines connecting the entities in Fig. 6 show the multiplicity (the numbers of objects that participate in an association) of the entities. As shown in Fig. 6 for example, one master module is in relation to at least one agent module, whereas one synchronization module 107 is in relation to exactly one master module.

The system according to the invention, as partially and schematically shown in Fig. 6, can include the following components and concepts: Distributed computing runtime [name] : A distributed computing runtime is an environment which is capable of running parallel programs. Domain [of distributed computing runtime] : A domain is a set of features, e.g. a set of specific instructions available, available for a runtime, which can be used to implement parallel programs. Program [of distributed computing runtime] : A program, with respect to the invention, is a parallel program written in a domain of a distributed computing runtime . Synchronization module 107: The synchronization module

107 manages and coordinates synchronization between distributed computing runtimes A, B, preferably using at least one master module and at least one agent module. Its lifetime can be bound to a lifetime of a computation on the hybrid parallel computation runtime

104.

Master module: A master module starts a distributed computing runtime and initiates computation. It controls main processes and can initiate execution of tasks through agent modules . It further can provide a central point for communication with agent modules and is responsible for running main processes of several distributed computing runtimes A, B.

Agent module: The agent module is managed by a master module and controls execution of tasks. It runs on cluster computation nodes 101, 102, 103 and cooperates with the master module. In particular it controls computation of a task of distributed computing runtimes A, B and synchronizes an execution of the task with an execution of at least one other task. It can also communicate with the storage module.

Storage module: The storage module is configured to exchange data between distributed computing runtimes A, B. Data exchange can be done via external storage as well as in-memory. The storage module in particular organizes data exchange between corresponding tasks of different distributed computing runtimes A, B. Distributed computing runtimes can use the storage module to save input data required for a computation on the same or another distributed computing runtime.

Distributed computing runtimes A, B can also use the storage module to obtain results required for a computation on the same or another distributed computing runtime.

Communication interface: A task can implement a predefined communication interface to communicate with the storage module. The task can use the communication interface to obtain input data from the storage module or to write a result of a computation to the storage module .

Each runtime in addition contains the following elements: Task: A task is a single unit of computation.

Worker [process] : A worker is a process running in a runtime and is capable of computing a task. It manages execution of a Task.

Main [process] : Main is a process in a distributed computing runtime. It manages computation flow and coordinates workers . It further manages and distributes tasks between worker processes.

Process: A process is a single processing unit. The number of processes describes the level of parallelism.

In view of the described components and modules, it should be noted, that the proposed system does not apply strict limitations to the distributed computing runtimes A, B as well as the hybrid parallel computation runtime 104. The runtimes may be completely different, but they all should operate according to the above mentioned notions.

An approach using above described modules has the advantage of providing a solution that easily scales with growing complexity of computation tasks and the need for resources . Separating the provided functionality to several modules not only allows a unified way of addressing different computations and tasks, but allows making it easier for programmers to understand the handling of software. Hierarchical relationships among the different modules allow and provide a central management and therefore a more effective way of sequential processing of tasks. Regarding the storage module and the communication interface, the invention offers a standardized way for information exchange among the different distributed computing runtimes. In the following the embodiment of the system according to the invention is described with regards to Fig. 6. The specific implementation contains two distributed computing runtimes A and B (referred to as "Runtime A" and "Runtime B" in Fig. 6) .

The lifetime of the synchronization module 107 can be bound to the lifetime of a computation. The synchronization module 107 coordinates a synchronization process with master- and agent modules.

The master module is capable of starting distributed computing runtime B and initiating computation thereon. The agent module controls computation of corresponding task B on distributed computing runtimes B and synchronizes it with task A on distributed computing runtime A. The storage module is used to exchange data between distributed computing runtimes A and B. Task A on distributed computing runtime A uses the storage module to save input data required for the computation and to obtain results. Task B on distributed computing runtime B implements a predefined communication interface in order to be able to communicate with the storage module and to use the storage module to obtain input data and write the result of the computation.

According to Fig. 6, all entities are now described with respect to their lifecycle, behavior and interaction with adjacent components. In addition to the aforementioned general information about key components and concepts the elements are described with respect to a specific embodiment that contains two distributed computing runtimes A, B: Main A: Is a head process of distributed computing runtime A and starts with the beginning of the computation. Main A controls the computation schedule and coordinates execution through workers processes.

Worker A: Worker A is a worker process in distributed computing runtime A and can control the execution of tasks sent by m process main A. Workers can report about the state of the tasks that are currently being executed.

Task A: A task is a single unit of computation. In this specific embodiment it represents the computation that is to be performed by task B in distributed computing runtime B after synchronization. Task A may start an agent module and write input data required for the computation for task B on distributed computing runtime B to the storage module. Synchronization module 107: Manages the synchronization process. It is present with the beginning of the computation. Its lifetime can be bound to lifetime of whole computation and it can start the Master module.

Master module: The master module can be started by the synchronization module and can control execution of tasks by the agent modules. Agent module: Can be started by task A on distributed computing runtime A. It delegates computations to task B of the distributed computing runtime B and reports to the master module about the state of execution of the relevant tasks.

Storage module: The storage module is configured to exchange data between distributed computing runtime A and distributed computing runtime B. It can also communicate with agent modules.

Main B: Is the head process of the distributed computing runtime B and starts with the beginning of the computation. Main B controls the computation schedule and coordinates execution through worker processes. It can communicate with the master module (depending on the number of runtimes, there may be more agent and mater modules) .

Worker B: Worker B is a worker processes in the distributed computing runtime B and controls the execution of tasks (e.g. task B) , sent by process main B. Workers can report about the state of the tasks that are currently being executed.

Task B: Task B is a single unit of computation. In this specific embodiment it represents the computation that is performed by task A in the primary Runtime Before synchronization. Task B reads initial data required for the computation from the storage module and writes the result of the computation to it.

Communication Interface: The communication interface is a communication protocol between tasks A and B. It enables tasks A and B to acquire input data for the computation and to write result. Fig. 9 describes in more detail steps and stages of synchronization of two distributed computing runtimes A and B (referred to as "Runtime A" and "Runtime B" in Fig. 9) and how the synchronization module 107 participates in synchronization according to the idea of the invention.

To be able to see how the synchronization module 107 and other modules integrate with existing distributed computing runtimes, first a computation flow on a single distributed computing runtime is described, before synchronized computation process will be discussed.

The whole computation on one single distributed computing runtime can be regarded as a continuous execution of tasks by workers controlled by a main process. A computation on a single runtime comprises the following steps:

1. A computation is started by main, the head process of a distributed computing runtime. The main process sends appropriate sets of tasks to the worker processes .

2. Worker processes execute tasks and report to the main process .

3. The computation is finished when all executed tasks are completed successfully.

A specific example of the synchronization process between two distributed computing runtime A and B is shown in Fig. 9 and described in the following:

In a hybrid parallel computation runtime 104 the synchronization module 107 is initialized, when computation on distributed computing runtime A is started. Afterwards, at least one master module is created and waits for at least one agent module to start. Subsequently, task A starts an agent module which takes control over the execution of the task and reports to the master module. Multiple agent modules can be started in a parallel computation task. All agent modules report information, for example about the task and the state of computation, to the master module. Thus, the master module can control execution of tasks on other runtimes, for example on distributed computing runtime B as shown in Fig. 9. When the agent module that was started by task A is ready, the master module configures and runs process main B on distributed computing runtime B and tells the relevant agent modules to start Task B.

When the execution of task B on distributed computing runtime B is finished, the master module passes, by using the associated agent module of task B, the result and control of computation back to task A on distributed computing runtime A. The master module switches back to a "waiting for agents" mode, for example if a computation needs to be computed again, or other runtimes have to be called. After synchronizing back, task A continues normally on distributed computing runtime A, whereby state of computation is reported to main A until task A exits. As the lifetime of the relevant agent module depends on the lifetime of its task, the agent module is destroyed when task A exits. The synchronization module 107 will be destroyed with the end of the whole computation. It is kept alive during computation, as main A may want to execute tasks on other distributed computing runtimes again. Synchronization is not limited to two distributed computing runtimes. An example of a distributed computing runtime calling multiple other runtimes might be the computation of a function based on a recursive sub- function. Every time a new value of the recursive sub- function needs to be calculated, the relevant task starts an agent by providing the name of the sub- function and the previous computed value to the synchronization module 107. The master module obtains the sent information by the relevant agent module and is thereby able to choose an appropriate distributed computing runtime A, B for running a task to compute the sub- function.

The following describes further or alternative commands used to control synchronization between distributed computing runtimes A, B according to the invention. The commands are also used to describe the transitions in Fig. 9. Command reference: start (taskld: Id, s: State): new Agent

Starts a new agent, whereby taskld is the ID of the task associated with the new agent and State is the state of computation of the task. report (taskld: Id, address: Address, s: State) -> Master Reports the Id of the task, its state of computation and address (location of the task) to the master module. move(s: State) : State Moves computation to the next state by applying a transition to it. exit: Code

Provides exit code of the process.

Detailed exemplary steps of a synchronized computation process of two distributed computing runtimes A and B of a hybrid parallel computation runtime 104 are described below being grouped in stages:

Configuration stage

1. Computation starts with main A, the head process of distributed computing runtime A. Main A sends appropriate sets of tasks to the worker processes : Task A i( ie[l, ...n] . At the same time, the synchronization module is started. It configures and starts the master module. Master module's initial state is to wait for agent modules to go online .

2. Each particular worker Ai starts task Aj..

3. Instead of doing computation, each task Ai starts a corresponding agent module agenti, which reports its state to the master module.

2. Computation stage

1. Based on the reports of the agent modules, master module decides to initiate start of main B, the main process of distributed computing runtime B.

2. The master module notifies all agent modules about the set of tasks started on distributed computing runtime B, thereby linking each particular agent module agenti to corresponding task B± .

3. The actual computation is executed by the set of tasks: Task B i; i ε [Ι,,.,π] . Each agent module agenti is waiting for synchronized task B to finish and report its exit code to the master module.

3. Finalization stage

1. The master module decides whether the whole computation was successful or not and send either a "success" or a "fail" exit message to each agent module .

2. Each particular agent module agenti , based on the message it receives from the master module, exits with "success" or "error" code, thereby unblocking corresponding task Ai .

3. Each task Ai exits after code of agenti is sent and computation of distributed computing runtime A continues normally. The invention also solves the problem of improving inter- runtime communication in a distributed cluster based parallel computing system by providing a hybrid parallel computation runtime 104 that synchronizes the distributed computing runtimes Spark and MPI.

The general features of MPI and Spark are already described earlier in the background chapter of this document.

However, according to the invention, Spark can be used as a framework for distributed parallel computations, operating with a driver (master) process and worker processes. When a computation job is sent to the master process, the master process generates a computation schedule and sends appropriate pieces of work (tasks) to the worker processes, thereby working in accordance with the abstract runtime principle as described earlier. In addition to resource management, Spark provides a failover mechanism to automatically rerun tasks that failed during computation. However, Spark lacks performance in certain computation scenarios compared to MPI .

MPI implements a low- level message passing interface, describing patterns of inter-process communication. MPI thereby provides computation with low overhead and opportunity for performance optimization. MPI programs operate with at least one system process, one of which being a main process which manages others and at least one worker process. Thus MPI matches the runtime principle as well. Drawbacks of MPI are higher efforts for implementing an algorithm and the lack of resource management and a failover mechanism. According to Fig. 1, the synchronization module 107 synchronizes the distributed computing runtime Spark and the distributed computing runtime MPI in the hybrid parallel computation runtime 104. In result, the following features can be provided:

Spark equips the hybrid parallel computation runtime 104 with a high-level interface to the cluster infrastructure offering simplified resource and failover management. In addition, MPI provides the hybrid parallel computation runtime 104 with a low- level interface that is directly supported by hardware and enables computation with low overhead as well as performance optimization. Thereby some parts of a program executed in the hybrid parallel computation runtime 104 are implemented more efficiently by delegating the relevant computations to the distributed computing runtime PI. In contrast, the overall resource management features are efficiently implemented by means of the Spark distributed computing runtime that provides failover management as well. The hybrid system automatically reruns parts of tasks that failed during computation (whereby computation can be done by means of the Spark as well as the MPI distributed computing runtime) .

Here, the Spark distributed computing runtime is used to efficiently provide different ways of access to the data that is to be computed, while the MPI distributed computing runtime is used for high performance computations on the parts of data Spark distributed computing runtime provides access to. The Spark distributed computing runtime is also used to directly do computation of data itself. The synchronization module 107 is used to synchronize the Spark distributed computing runtime and the MPI distributed computing runtime.

The Spark distributed computing runtime can be configured to operate on data using resilient distributed datasets (RDDs) . RDDs in general are a distributed memory abstraction that lets programmers perform in-memory computations on large clusters in a fault-tolerant manner. According to the invention, RDDs can be regarded as a set of elements distributed across the cluster computation nodes 101, 102, 103 that can be operated on in parallel. Fig. 10 shows a schematic abstraction of a RDD. Data in an RDD is organized as a collection of elements of type T that is partitioned over separate nodes.

Fig. 10 shows an RDD [T] that is partitioned across two nodes 0, 1. Each partition contains elements of type T. Individual partitions can be accessed by iterators (iter [T] ) .

A communication interface for accessing iterator iterator [T] is shown in Fig. 11. It is implemented by a task to communicate with the storage module. It enables to read an element of a certain type T, check if a further element is present and traverse to it. Operations on data are carried out in terms of transformations of RDDs . In terms of RDDs, Spark provides access to data on RDD-, partition- and iterator- level . As Spark has different ways to access data it operates on, therefore, when synchronizing Spark and MPI the same procedures are applied as while accessing data. They are described in details below:

T and U represent abstract data types and partially relate to the data in the Fig. 10 whereby T represents the data type of the input values and U represents the data type of the result (not shown in Fig. 10) .

According to a first aspect of this embodiment, when synchronizing Spark and MPI, MPI operates on data on RDD level, where all tasks computed in MPI represent a distributed function f : RDD [T] -> RDD [U] RDD [T] = Array [Iterator [T] ] f : Array [Iterator [T] ] -> Array [Iterator [U] ] Operating on data on RDD level describes a transformation of RDD [T] by applying function f to it.

By means of MPI, function f is constructed in terms of Dist and has the type Dist [Array [Iterator [T] ]] . The MPI implementation uses Dist as an abstraction for distributed computation. The Dist [T] data type is a collection of values (like an Array [T] ) , where each element of the collection resides in a separate MPI sub-process which run as part of the whole distributed MPI process. When operating on data on RDD level multiple elements in multiple partitions across multiple nodes can be addressed.

According to a second aspect of this embodiment, when synchronizing Spark and MPI, MPI operates on data on partition level, where each task B represents a function f : Iterator [T] - > Iterator [U]

Operating on data on partition level describes transformation of an RDD partition. MPI in this scenario can operate on arrays. Thus function f ' , which transforms data represented by arrays, is used in MPI distributed computing runtime to implement function f as the following: f (items) = f '( items . toArray) . tolterator

According to a third aspect of this embodiment, when synchronizing Spark and MPI, MPI operates on data on iterator level; f: T -> U Operating on data on iterator level describes transformation of each value of a collection of type T by applying function f to the Iterator. The corresponding MPI implementation can be represented by function f : (S,T) -> (S,U) which takes the current state of a variable of type S in MPI distributed computing runtime and next item of type T from Spark distributed computing runtime and produces next item of type U and updates the state of the variable of type S .

Performance of a hybrid parallel computation runtime 104 with a Spark distributed computing runtime and a MPI distributed computing runtime was compared to pure Spark and MPI solutions, and tested with a sparse matrix dense vector multiplication (SMDVM) algorithm in double precision. The program scenario included loading input data, performing computations and saving results. To measure performance only the computations part was taken into account .

The hybrid parallel computation runtime 104 was configured to the effect that the Spark distributed computing runtime used a cluster consisting of 8 cluster computation nodes

101, 102, 103, 1 CPU unit each. The MPI distributed computing runtime used 1 cluster computation node with 8 CPU units each. Overall computing powers of these two sets of cluster computation nodes 101, 102, 103 were equal. The input data was a sparse 100k x 10k matrix with a 10% density (percentage of non-zero elements) . The solution using a single Spark distributed computing runtime was the slowest. All computations on the Spark distributed computing runtime took ca. 25 seconds. The setup using exclusively the MPI distributed computing runtime was about 2x faster, e.g. by 13 seconds.

The main part of the program executed in the hybrid parallel computation runtime 104 comprising a Spark and an MPI distributed computing runtime was written in Spark domain specific language while the multiplication algorithm was written using MPI. The solution showed in-between results, as expected (15 seconds (in-memory) , resp. 19 seconds (HDFS) ) . The slowdown compared to the pure MPI solution is mainly caused by data exchange 10 overhead. Data exchange can also be improved by using a more efficient data exchange method (for example in-memory methods instead of a distributed file system (HDFS) ) .

The invention has been described in conjunction with various embodiments herein. However, other variations to the enclosed embodiments can be understood and effected by those skilled in the art and practicing the claimed invention, from a study of the drawings, the disclosure and the appended claims. In the claims, the word "comprising" does not exclude other elements or steps, and the indefinite article "a" or "an" does not exclude a plurality. A single processor or other unit may fulfill the functions of several items recited in the claims. The mere fact that certain measures are recited in mutually different dependent claims does not indicate that a combination of these measures cannot be used to advantage. A computer program may be stored/distributed on a suitable medium, such as an optical storage medium or a solid-state medium supplied together with or as part of other hardware, but may also be distributed in other forms, such as via the internet or other wired or wireless telecommunication systems .