Login| Sign Up| Help| Contact|

Patent Searching and Data


Title:
DATA HANDLING DEVICE, DATABASE SYSTEM AND METHOD FOR OPERATING A DATABASE SYSTEM WITH EFFICIENT COMMUNICATION
Document Type and Number:
WIPO Patent Application WO/2017/176144
Kind Code:
A1
Abstract:
A data handling device (40), adapted to operate as part of a distributed database system (404) is provided. The data handling device (40) comprises a logical planner (42), adapted to generate a logical plan based upon a database query, a physical planner (43), adapted to generate a physical plan based upon the logical plan, a marking unit (44), adapted to determine communication operators within the physical plan wherein the communication operators are operators containing communication, determine communication patterns of the communication operators, based upon operator types of the communication operators, and mark the determined communication operators, each with a data marker comprising the determined communication pattern of the communication operator. Moreover, the data handling device (40) comprises a code generator (45), adapted to generate an executable code based upon the physical plan and convert the data markers into communicator instructions. Furthermore, the data handling device (40) comprises a code executer (46), adapted to execute the executable code and a communicator (47) adapted to perform communication to further data handling devices (402,403) within the distributed database system (404) based upon the communicator instructions.

Inventors:
KOLMAKOV DMITRY SERGEEVICH (CN)
SLESARENKO ALEXANDER VLADIMIROVICH (CN)
ZHANG XUECANG (CN)
Application Number:
PCT/RU2016/000191
Publication Date:
October 12, 2017
Filing Date:
April 05, 2016
Export Citation:
Click for automatic bibliography generation   Help
Assignee:
HUAWEI TECH CO LTD (CN)
KOLMAKOV DMITRY SERGEEVICH (CN)
International Classes:
G06F17/30
Foreign References:
US20110302583A12011-12-08
EP2660732A12013-11-06
Other References:
None
Attorney, Agent or Firm:
LAW FIRM "GORODISSKY & PARTNERS " LTD et al. (RU)
Download PDF:
Claims:
Claims l. A data handling device (40), adapted to operate as part of a distributed database system (404), comprising:

- a logical planner (42), adapted to generate a logical plan based upon a database query,

- a physical planner (43), adapted to generate a physical plan (70) based upon the logical plan,

- a marking unit (44), adapted to

o determine communication operators (601, 602, 604, 605) within the physical plan (70), wherein the communication operators (601, 602, 604,

605) are operators containing communication,

o determine communication patterns of the communication operators (601,

602, 604, 605), based upon operator types of the communication operators (601, 602, 604, 605),

o mark the determined communication operators (601, 602, 604, 605), each with a data marker (71, 72, 73, 74, 75) comprising the determined communication pattern of the communication operator (601, 602, 604,

605),

- a code generator (45), adapted to

o generate executable code based upon the physical plan (70),

o convert the data markers (71, 72, 73, 74, 75) into communicator

instructions,

- a code executor (46), adapted to execute the executable code,

- a communicator (47), adapted to perform communication to further data handling devices (402, 403) within the distributed database system (404), based upon the communicator instructions.

2. The data handling device (40) according to claim

wherein the database query is an SQL query.

3. The data handling device (40) according to claim 2,

wherein the marking unit (44) is adapted to determine operators effecting

communication within the distributed database system, especially Replication operators, and/or Map Reduce operators, and/or Sort operators, and/or Shuffle Join operators, and/or Hash Join operators, and/or Broadcast Hash Join operators, and/or Merge Join operators as communication operators (601, 602, 604, 605).

4. The data handling device (40) according to claim 3,

wherein the marking unit (44) is adapted to distinguish between a set of network communication patterns based on the communication operators, and/or

wherein the marking unit (44) is adapted to determine a peer-to-peer communication pattern for Replication operators, and/ or MapReduce operators, and/or Sort operators, and/or Shuffle Join operators, and/or Hash Join operators, and/or Broadcast Hash Join operators, and/or Merge Join operators, and/or

wherein the marking unit (44) is adapted to determine a multicast or broadcast communication pattern for Replication operators and/ or Broadcast Hash Join operators, and/or

wherein the marking unit (44) is adapted to determine an All-to-all communication pattern for Shuffle Join operators, and/or Hash Join operators, and/or Merge Join operators.

5. The data handling device (40) according to any of the claims 1 to 4,

wherein the communicator (47) is adapted to dynamically determine a

communication protocol to be used for each operator based upon at least the communicator instructions.

6. The data handling device (40) according to claim 5,

wherein the data markers (71, 72, 73, 74, 75) furthermore comprise a total amount of data to be communicated by the operator, and

wherein the communicator (47) is adapted to dynamically determine the

communication protocol to be used for each operator, additionally based upon the total amount of data to be communicated by the operator. 7. The data handling device (40) according to claim 5 or 6,

wherein the communicator (47) is adapted to perform the communication based upon the determined communication protocol for each operator.

8. The data handling device (40) according to any of the claims 1 to 7,

wherein the data handling device (40) furthermore comprises a storage unit, adapted to store at least part of data stored in the distributed database system.

9. The data handling device (40) according to any of the claims 1 to 8,

wherein the data handling device (40) furthermore comprises a query receiver (41), adapted to receive a database query.

10. The data handling device (40) according to any of the claims 1 to 9,

wherein the communicator (47) is adapted to transmit at least parts of data to be processed to the further data handling devices. 11. A database system (404) comprising at least a first data handling device (40) according to any of the claims 1 to 10 and a second data handling device (402) according to any of the claims 1 to 10,

wherein the communicator (47) of the first data handling device (40) is adapted to perform communication to at least the second data handling device (402) based upon the determined communicator instructions.

12. The database system (404) according to claim 11,

wherein the database system (404) comprises at least a third data handling device (403), and

wherein the communicator (47) of the first data handling device (40) is adapted to perform communication to at least the second data handling device (402) and the third data handling device (403) based upon the determined communicator instructions. 13. A method for operating a database system comprising a plurality of data handling devices, comprising:

- generating (130) a logical plan based upon a database query,

- generating (131) a physical plan (70) based upon the logical plan,

- determining (132) communication operators (601, 602, 604, 605) within the physical plan (70),

- determining (133) communication patterns of the communication operators, based upon operator types of the communication operators (601, 602, 604, 605),

- marking (134) the determined communication operators, each with a data marker (71, 72, 73, 74, 75) comprising the determined communication pattern of the communication operator,

- generating (135) executable code based upon the physical plan (70), - converting (136) the data markers (71, 72, 73, 74, 75) into communicator instructions,

- executing (137) the executable code,

- performing (138) communication to further data handling devices within the distributed database system, based upon the communicator instructions.

14. A computer program with a program code for performing the method according to claim 13 when the computer program runs on a computer.

Description:
DATA HANDLING DEVICE, DATABASE SYSTEM AND METHOD FOR OPERATING A DATABASE SYSTEM WITH EFFICIENT COMMUNICATION

TECHNICAL FIELD

The invention relates to the field of computer software engineering, and more specifically to distributed database systems.

BACKGROUND

Distributed database systems have a plurality of individual nodes, also referred to as data handling devices. When performing a database query, communication between these different nodes occurs. Especially in database systems with a plurality of nodes, this communication can be the bottleneck of the database system.

An SQL query execution pipeline may be divided into a number of steps. This is shown along Fig. 1.

1. Planning: In a first step 12, query plain text 11 is translated to a logical plan which is an intermediate tree-like representation of the query. The logical plan is optimized and translated to a physical plan in a second step 13, which can be also optimized taking into account data parameters. The physical plan consists of physical plan operators which represent some basis operation on data set in accordance to the database low-level interface.

2. Code generation: In a third step 14, an executable code is generated based on the physical plan. This improves the performance of the database system. This approach is used in some exemplary frameworks for distributed SQL query execution: SparkSQL, Cassandra, etc.

3. Execution: In a fourth step 15, the code prepared in the previous step 14 is executed. In case of a distributed database, an execution is performed simultaneously on a cluster of workstations connected in a network.

In FIG. 2, the connection between data 20 consisting of data chunks 21-23 and an execution plan 24 involving a number of nodes 25-27 is depicted. Especially, it is shown here that different data chunks 21-23 are stored on different nodes 25-27 and only in interaction of these different nodes 25-27, the execution plan 24 can be executed leading to a result 28.

In case of distributed query execution, data to be processed is spread within a cluster, so that each machine stores only a part of the source dataset. Nevertheless some operations on a dataset may require a data exchange between the nodes of the cluster, for example, an operation of aggregation which performs accumulation of a single value based on the whole dataset. Such network communication may greatly reduce the performance of a database system either if they involve big data sets or if they aren't performed in an optimal way.

Different SQL Physical layer operators may generate a network traffic which can match different communication patterns. Such communication patterns are shown in Fig. 3a - Fig. 3c In FIG. 3a, a Peer-to-Peer communication pattern between two nodes 30 is shown. In FIG. 3b, a multicast communication pattern between a plurality of nodes 31 is shown. Here, the communication originates from one of the nodes 31 and terminates at a number of the nodes 31. In FIG. 3c, an All-to-All communication pattern between a plurality of nodes 32 is shown. Every node can communicate with every other node in this communication pattern.

All these patterns are actively used in distributed query execution. Multicast is used for data replication, All-to-All communication pattern is used for shuffling and Peer- to-peer pattern is used as a basis for all other types of communication. Exemplary solutions for distributed query execution don't differ between these patterns. However network performance greatly depends on the implementation of communication patterns and specialized transport protocols may have better performance for some particular communication pattern.

An exemplary protocol, which can be used as a transport layer, TCP is well suited for Peer-to-Peer communication since communications within this protocol are performed through the previously created Point-to-Point connections. A multicast communication pattern implemented with TCP is ineffective because it generates a lot of duplicated traffic in the network since the same data should be transmitted multiple times - to each destination through the corresponding connection.

A spark framework implements broadcast communication through the usage of 1

BitTorrent application level protocol which is still based on TCP. Such an approach allows speeding up broadcasting but it has several drawbacks:

1. Nevertheless some nodes can receive broadcasted data from neighbor nodes decreasing the load on the sending node, it doesn't solve the problem of duplicated packets in general. Network shall convey as many identical packets as there are nodes in the network.

2. The additional protocol acting on top of transport layer leads to additional

overhead which will severely affect broadcasting performance for small messages. In contrast, the TIPC transport layer protocol natively supports a multicast communication pattern and shows dramatically better performance than TCP within this pattern. But Peer-to-Peer performance of TIPC is worse than TCP so there is no univocal answer on a question what protocol shall be used. We identify the following three main problems of exemplary solutions:

1. Exemplary solutions are based on a single networking transport protocol

statically chosen by some parameters. Such an approach leads to an overhead in data exchange within communication patterns, where the chosen protocol doesn't work well.

2. General purpose protocols are designed to be suitable in all possible situations.

Their usage leads to an overhead caused by protocol generality even if it is used only in a couple of usage patterns.

3. Additional logic placed on top of transport layer leads to an additional

overhead.

SUMMARY

Accordingly, an object of the present invention is to provide an apparatus and method, which allow for an efficient communication within distributed databases. The object is solved by the features of claim 1 for the apparatus and claim 13 for the method. Further it is solved by the features of claim 14 for the associated computer program. The dependent claims contain further developments.

According to a first aspect of the invention, a data handling device, adapted to operate as part of a distributed database system is provided. The data handling device comprises a logical planner, adapted to generate a logical plan based upon a database query, a physical planner, adapted to generate a physical plan based upon the logical plan, a marking unit, adapted to determine communication operators within the physical plan, wherein the communication operators are operators containing communication, determine communication patterns of the communication operators, based upon operator types of the communication operators, and mark the determined communication operators, each with a data marker comprising the determined communication pattern of the communication operator. Moreover, the data handling device comprises a code generator, adapted to generate executable code based upon the physical plan and convert the data markers into communicator instructions.

Furthermore, the data handling device comprises a code executer, adapted to execute the executable code, and a communicator adapted to perform communication to further data handling devices within the distributed database system based upon the communicator instructions. It is thereby possible to separate the communication task from the regular database operations thereby enabling a very efficient communication leading to a very efficient database operation.

According to a first implementation form of the first aspect, the database query is an SQL query. This allows for the use of readily available database components. According to an implementation form of the previous implementation form of the first aspect, the marking unit is adapted to determine operators effecting communication within the distributed database system, especially Replication operators, and/or MapReduce operators and/or Sort operators, and/or Shuffle Join operators, and/or Hash Join operators, and/or Broadcast Hash Join operators, and/or Merge Join operators as communication operators. This allows for a very efficient database operation.

According to an implementation form of the previous implementation form of the first aspect, the marking unit is adapted to distinguish between a set of network communication patterns based on the communication operators. Additionally or alternatively, the marking unit is adapted to determine a Peer-to-Peer communication pattern for Replication operators and/or MapReduce operators and/or Sort operators and/or Shuffle Join operators and/ or Hash Join operators and/or Broadcast Hash Join operators, and/or Merge Join operators. Additionally or alternatively, the marking unit is adapted to determine a multicast or broadcast communication pattern for Replication operators and/or Broadcast Hash Join operators. Additionally or alternatively, the marking unit is adapted to determine an All-to-All communication pattern for Shuffle Join operators, and/or Hash Join operators, and/or Merge Join operators. A very flexible selection of the communication pattern based upon the operators is thereby achieved.

According to a further implementation form of the first aspect or the previously shown implementation forms, the communicator is adapted to dynamically determine a communication protocol to be used for each operator based upon at least the communicator instructions. This allows for an especially effective database operation.

According to an implementation form of the previous implementation form, the data markers furthermore comprise a total amount of data to be communicated by the operator. The communicator is then adapted to dynamically determine the communication protocol to be used for each operator, additionally based upon the total amount of data to be communicated by the operator. This allows for even better selecting the most suitable communication protocol, leading to an improvement of the database operation efficiency.

According to a further implementation form of the previous two implementation forms, the communicator is adapted to perform the communication based upon the determined communication protocol for each operator. This allows for an especially effective database operation.

According to a further implementation form of the first aspect or the previously shown implementation forms of the first aspect, the data handling device further comprises a storage unit, which is adapted to store at least part of the data stored in the distributed database system. By dividing the data between different data handling devices in the distributed database system, an especially efficient database operation is achieved.

According to a further implementation form of the first aspect or the previously shown implementation forms of the first aspect, the data handling device furthermore comprises a query receiver, adapted to receive a database query. This allows for the processing of standardized database queries.

According to a further implementation form of the first aspect or any of the previous implementation forms, the communicator is adapted to transmit at least parts of data to be processed to the further data handling devices. It is thereby possible to not store all data in all data handling devices, but to conserve storage space.

According to a second aspect of the invention, a database system comprising at least a first data handling device according to the first aspect or any of the implementation forms of the first aspect and a second data handling device according to the first aspect or any of the implementation forms of the first aspect is provided. The communicator of the first data handling device is adapted to perform communication to at least the second data handling device based upon the determined communicator instructions. An especially efficient database system is thereby achieved.

According to an implementation form of the second aspect, the database system comprises at least a third data handling device. The communicator of the first data handling device is then adapted to perform communication to at least the second data handling device and the third data handling device based upon the determined communicator instructions. An especially efficient database system is thereby achieved.

According to a third aspect of the invention, a method for operating a database system comprising a plurality of data handling devices is provided. The method comprises generating a logical plan based upon a database query, generating a physical plan based upon the logical plan, determining communication operators within the physical plan, determining communication patterns of the communication operators, based upon operator types of the communication operators, and marking the determined communication operators, each with a data marker comprising the determined communication pattern of the communication operator. Moreover, the method comprises generating executable code based upon the physical plan, converting the data markers into communicator instructions, executing the executable code, and performing communications to further data handling devices within the distributed database system, based upon the communicator instructions. An especially efficient operation of the distributer database system is thereby achieved.

According to a fourth aspect of the invention, a computer program with a program code for performing the method according to the third aspect of the invention, when the computer program runs on a computer, is provided. An especially efficient database operation is thereby enabled. In general, in order to solve the problems identified above a new approach to implement a network communication for distributed database query execution pipeline is proposed. The following steps are advantageously performed:

l. Extend planning phase with the following:

Recognize and mark nodes (Physical Operators) of query physical plan which contains communications.

2. Extend code generation phase with the following:

In accordance with the data markers added on step ι add data wrappers to create application level messages consisted of: a) communication pattern identifier, b) additional service information and c) the data to be exchanged.

3. Extend execution phase with the following:

a. Extend SQL query execution system with a communicator which encapsulates all transport layer protocols to be used at runtime.

b. Provide that Communication pattern identifier and additional service

information are conveyed with data to a communicator.

4. Dynamically choose the best communication protocol to use for data exchange within the specified communication pattern. Generally, it has to be noted that all arrangements, devices, elements, units and means and so forth described in the present application could be implemented by software or hardware elements or any kind of combination thereof. Furthermore, the devices may be processors or may comprise processors, wherein the functions of the elements, units and means described in the present applications may be implemented in one or more processors. All steps which are performed by the various entities described in the present application as well as the functionality described to be performed by the various entities are intended to mean that the respective entity is adapted to or configured to perform the respective steps and functionalities. Even if in the following description or specific embodiments, a specific functionality or step to be performed by a general entity is not reflected in the description of a specific detailed element of that entity which performs that specific step or functionality, it should be clear for a skilled person that these methods and functionalities can be implemented in respect of software or hardware elements, or any kind of combination thereof. BRIEF DESCRIPTION OF DRAWINGS 6 000191

The present invention is in the following explained in detail in relation to

embodiments of the invention in reference to the enclosed drawings, in which

FIG. l shows an exemplary database query execution pipeline;

FIG. 2 shows exemplary distributed data handling;

FIG. 3a shows an exemplary Peer-to-Peer communication pattern;

FIG. 3b shows an exemplary multicast communication pattern;

FIG. 3c shows an exemplary All-to-All communication pattern;

FIG. 4 shows a first embodiment of the data handling device according to the first aspect of the invention;

FIG. 5 shows an extension of a planning phase as employed by a second

embodiment of the first aspect of the invention;

FIG. 6 shows an exemplary physical plan generated by a third embodiment of the data handling device according to the first aspect of the invention;

FIG. 7 shows an exemplary physical plan extended with data markers as

employed by a fourth embodiment of the data handling device according to the first aspect of the invention;

FIG. 8 shows a detail of a fifth embodiment of the data handling device

according to the first aspect of the invention;

FIG. 9 shows an exemplary database query execution process as employed by a sixth embodiment of the data handling device according to the first aspect of the invention;

FIG. 10 shows communication through different networks with multiple

transport layer protocols as used by a seventh embodiment of the data handling device according to the first aspect of the invention; shows a detail of an eighth embodiment of the data handling device of the first aspect of the invention; FIG. 12 shows exemplary traffic multiplexing between different nodes in a

distributed database system;

FIG. 13 shows a first embodiment of the method for operating a database

system according to the third aspect of the invention;

FIG. 14 shows achievable results when using a data handling device according to the first aspect of the invention or a method for operating a distributed database system according to the third aspect of the invention for an entire test case;

FIG. 15 shows achievable results when using a data handling device according to the first aspect of the invention or a method for operating a distributed database system according to the third aspect of the invention for a single Broadcast Hash Join operation;

FIG. 16 shows achievable results when using a data handling device according to the first aspect of the invention or a method for operating a distributed database system according to the third aspect of the invention as processing time in % versus a baseline.

DESCRIPTION OF EMBODIMENTS

First, the function of exemplary distributed database systems was demonstrated along FIG. 1 and FIG. 2. Along FIG. 3a - FIG. 3c, different communication patterns were explained. With reference to FIG. 4 - FIG. 12, different embodiments of the data handling device according to the first aspect of the invention are explained. Along FIG. 13, the function of the method according to the third aspect of the invention is described in detail. Finally, along FIG. 14 - FIG. 16 achievable efficiency increases are shown. Similar entities and reference numbers in different figures have been partially omitted.

Below is a definition of terms and their meaning:

In FIG. 4, a first embodiment of the database system 404 of the second aspect of the invention including a first embodiment of the data handling device 40 of the first aspect of the invention is shown. The data handling device 40 comprises a query receiver 41, which is connected to a logical planner 42, which again is connected to a physical planner 43. The physical planner 43 is connected to a marking unit 44, which again is connected to a code generator 45. The code generator 45 moreover is connected to a code executer 46, which in turn is connected to a communicator 47. All units 41 - 47 are connected to a control unit 48. The communicator 47 moreover is connected to a network 401, which is connected to further data handling devices 402 and 403. The network 401 and the data handling devices 402, 403 are not part of the data handling device 40. Together with the data handling device 40, they form the distributed database system 404, though. The control unit 48 controls the function of all other units of the data handling device 40.

In the distributed database system, the query receiver 41 receives a database query, especially an SQL query. The query is processed and handed on to the logical planner 42. The logical planner 42 generates a logical plan based upon the database query. This logical plan is handed to the physical planner 43, which generates a physical plan from the logical plan. The physical plan is handed on to the marking unit, which determines communication operators within the physical plan. Communication operators are operators containing communication. The marking unit 44 then determines communication patterns of the communication operators based upon the operator types of the communication operators. Especially, the marking unit determines operators effecting communication within the distributed database system. Especially, Replication operators, and/or Map Reduce operators and/or Sort operators and/or Shuffle Join operators and/or Hash Join operators and/or Broadcast Hash Join operators and/or Merge Join operators are determined as communication operators. Finally, the marking unit marks the determined communication operators, each with a data marker comprising the determined communication pattern of the communication operator.

Moreover, the marking unit 44 distinguishes between a set of network communication patterns based upon the communication operators. Especially the marking unit determines a Peer-to-Peer communication pattern for Replication operators and/ or MapReduce operators and/or Sort operators and/or Shuffle Join operators and/or Hash Join operators and/or Broadcast Hash Join operators and/ or Merge Join operators. Also, the marking unit determines a multicast or broadcast communication pattern for Replication operators and/or Broadcast Hash Join operators. The marking unit 44 moreover determines an All-to-All communication pattern for Shuffle Join operators and/or Hash Join operators and/or Merge Join operators.

The marked physical plan is then headed on to the code generator which generates executable code based upon the physical plan and converts the data markers into communicator instructions. These are handed on to the code executer 46 which executes the executable code. Moreover, the communicator 47 performs

communication to further data handling devices 402, 403 based upon the

communicator instructions.

The marking unit 44 moreover marks the operators with the total amount of data to be communicated. The communication protocol to use for each operator is then determined by the communicator 47 based upon the total amount of data to be transmitted and based upon the communication pattern. SQL is the de-facto standard of database access method. Below is an example of SQL Q3 query from an exemplary TPC-H benchmark:

1 select

2 l_orderkey,

3 sum(l_extendedprice*(i-l_discount)) as revenue,

I 4 o_orderdate,

5 o_shippriority

7 customer join orders on c_custkey = o__custkey

8 join lineitem on l_orderkey = o_orderkey

I 9 where

10 c _mktsegment = 'HOUSEHOLD'

11 and o_orderdate < 19950304

12 and l_shipdate > 19950304

13 group by 14 l_orderkey,

15 o_orderdate,

16 o_shippriority

17 order by

18 revenue desc,

19 o_orderdate

20 limit 10;

SQL query text is handled by a database engine which in a first step translates SQL query text to a tree-like representation known as logical plan. After that it performs a logical optimization resulting in an Optimized logical plan which in turn is translated to the low-level database API basis. This resulting plan is called physical plan and it may be also optimized taking into account database physical parameters. Leaves of the physical plan tree represent data sources and nodes are physical plan operators which represent different elementary operations on the relational database. At the end of this chain an additional step of physical plan handling is added. This is presented in detail in Fig. 5. An input 50 consists of a physical plan 51 and data related information 52. This input 50 is handed to a function block 53 constituting an extended planning phase. This function block 53 comprises detecting communication related physical plan operators 54. As an input for this recognizing of the

communication related operators, correspondences of operators and communication patterns, which are stored in a knowledge base 56, are used. The extended planning phase 53 moreover comprises marking the respectively detected operators in a function block 55. Finally, an extended physical plan is handed on as output 57.

In FIG. 6, an exemplary physical plan 60 is shown. The physical plan 60 comprises a number of operators 601 - 614. Communication operators 601, 603, 604, 605 are marked with a dashed line. These operators 601, 603, 604 and 605 are detected.

In FIG. 7, an exemplary extended physical plan 70 is shown. Here, the additional data markers 71, 72, 73, 74 and 75 are integrated into the extended physical plan 70. Each one of the data markers 71 - 75 contains information regarding the employed communication pattern. Additional information can be stored within the data markers.

The above-mentioned data markers have a strict connection to the associated physical plan operators and convey information about a communication pattern -for example its Communication Pattern ID - to be used for further data exchange.

The choice of a Communication Pattern ID to be associated with the particular physical plan operator is made using for example the following table. This table specifies the correspondence between physical plan operators and communication patterns:

In FIG. 8, details of the code generation phase are shown. As an input, an extended physical plan 81, as generated by the marking unit 44 of FIG. 4 is employed. This extended physical plan is handed on to a code generator 82, which corresponds to the code generator 45 of FIG. 4. The code generator 82 uses information stored in a code generation library 84. Especially, the code generator 82 uses a set of conversion rules for physical plan operators 85 and a set of library modifications comprising a data marker converter and modifications for existing methods. As an output, executable code 83 is produced by the code generator 82.

One possible exemplary code generation approach is to convert the physical plan to code written in a general-purpose language, for example C++. Such an approach has 16 000191

the advantage that the generated code may be compiled to an executable code with additional optimizations provided by a compiler. Code generation is then performed by a dedicated module - the code generator 82, which converts the tree representation of the physical plan to the plain executable code. An Extended physical plan generated in the previous step contains a new type of physical plan operator - a data marker which is also converted within the executable code. The code generator is therefore extended with a converter for data marker physical plan operators

Besides the converter for the data marker, also the converters for the existing physical plan operators need to be modified in order to provide the Communication layer with an access to communication related data.

An exemplary prototype query execution system is called Flint. Flint allows executing of query physical plans represented in C++ which can be considered as the Code generation phase output. A detailed description of this approach is given later on.

During the execution phase the generated program is run on distributed cluster. Each of the nodes in the cluster handles a part of input data synchronously which means that they perform the same operations on the data at the same moment. In Fig. 9 the SQL query execution process is shown for one separate node of the cluster, the presented diagram presents the execution 90 of a particular marked physical operator.

As an input, data 91 which is to be processed and a data marker 92 are used. The data marker 92 comprises communication related information 98, for example a communication pattern ID and additional service information. Moreover, the generated executable code 93 is processed. This results in a processing of local data 94, data to be exchanged 95 and resulting data 96. The data to be exchanged 95 moreover is communicated by a communicator 99, which corresponds to the communicator 47 of FIG. 4. After processing the executable code 93, a processing of a next operator 97 is performed.

The processing by the communicator is shown along FIG. 10. The code execution 101 comprises an executer application 102, extended with a communicator 103, which encapsulates all transport layer protocols 104, 105, 106 to be used at runtime. Each of the transport protocol 103, 104, 105 forms a network 1001 between all cluster nodes

107, 108, 109 and their addressing is explicitly translated to the addressing used by the application. For this goal, the communicator 103 forms a translating table where a correspondence between the Application layer and the transport layer addresses is stored:

The communicator 103 may be based on an arbitrary number of transport layer protocols 104, 105, 106 and can even encapsulate other application layer protocols.

A large scale scheme of the Communicator 113, which corresponds to the

communicator 103 of Fig. 10 and the communicator 47 of Fig. 4, is shown in Fig. 11. The communication related information 111 associated with the Input data 110 is used for Communication protocol selection 114 described in the following. The

communication pattern is chosen based on information stored in a knowledge base 116. The chosen Protocol ID is conveyed along with data to a transmitter 115.

Reception of data by a receiver 117 does not require any special operations. Received data is simply conveyed to a higher protocol layer as output data 112.

In order to choose the communication protocol, all the data to be exchanged is conveyed from the application to the communicator 113. This data may either contain or not contain additional information associated with it. If the data has no service information it is transmitted using the default transport protocol. In the following table, possible correspondences between determined communication patterns and resulting communication protocols are shown:

Multireceive Multipath TCP

All-to-All TCP

This list is extendable.

If data is marked with additional information it is transmitted using the transport protocol which better matches the data exchange communication pattern. A decision what transport protocol to use is made with respect to the following:

l. Static knowledge about transport protocols which is got in advance.

2. Dynamic data to be exchanged parameters such as: communication pattern ID, total amount of data within the pattern, optimization level, etc.

Finally the determined communication Protocol is used for data transmission. Especially, on the sender side, traffic generated by the application is multiplexed between supported transport layer protocols in accordance to the Protocol ID. On the receiver side, data received by different protocols is merged into a single stream to a respective application. There is no need to convey any transport related information to a higher protocol layer, so that the data is conveyed as it is, without any additional fields associated with it.

In FIG. 12, a transmitter of node X 1201 and a receiver of node Y 1208 are displayed. The transmitter of node X comprises a multiplexer 1204 and a number of protocol stacks 1205, 1206 and 1207. Input data 1202 and a respective protocol ID 1203 are handed to the multiplexer 1204, which selects the respective protocol stack 1205 -

1207 based on the protocol ID 1203 and transmits the data 1202 to the receiver of node Y 1208 using the respective protocol stack 1205 - 1207. The receiver of node Y

1208 comprises a number of protocol stacks 1209 - 1211 and a demultiplexer 1212. When receiving data by use of a specific protocol stack 1205 - 1207 of the transmitter of node X 1201, the respective protocol stack 1209 - 1211 of the receiver of node Y

1208 decodes the data which is demultiplexed by the demultiplexer 1212 and provided as output data 1213.

In FIG. 13, a flow diagram of an embodiment of the method for operating a distributed database system is shown. In a first step 130, a logical plan is generated from a received database query. The generated logical plan is then used for generating a physical plan in a second step 131. Therefrom, communication operators within the physical plan are determined in a third step 132. In a fourth step 133, communication patterns of communication operators within the physical plan are determined. In a fifth step 134, the determined communication operators are marked within the physical plan resulting in an extended physical plan. In a sixth step 135, executable code is generated from the extended physical plan. In a seventh step 136, the data markers within the extended physical plan are converted into communicator instructions. In an eighth step 137 the executable code is executed. In a final step 138, communication between different nodes of the distributor database system is performed using the communicator instructions generated in the seventh step 136.

It is moreover pointed out that the elaborations regarding the data handling device are also applicable to the method for operating the distributed database system. In FIG. 14 - FIG. 16, a speedup of database queries using the before-described approaches is shown. Especially, a comparison is performed based on two protocols - TCP and TIPC. As a baseline, a standard approach was applied: In this case, a Peer-to- Peer oriented protocol is used. As a benchmark, a popular TPC-H decision support benchmark is used. It consists of a suite of business oriented ad-hoc queries and concurrent data modifications. The queries and the data populating the database have been chosen to have broad industry-wide relevance. This benchmark illustrates decision support systems that examine large volumes of data, execute queries with a high degree of complexity, and give answers to critical business questions. A scale factor of 100 which generates about 100Gb of data in tables is used. Here results of a Q8 query are shown. Results of an exemplary execution query Q8 are shown at Fig. 14. Especially, results for Q8 without using Broadcast Hash Join are presented to show performance gains from applying it in both approaches - standard and the approach according to the invention. It is pointed out that on the x-axis, a number of nodes is shown, while on the y-axis, an execution time of the query is depicted.

It can clearly be seen that independent of the number of nodes, the approach according to the present invention is advantageous over the two exemplary solutions.

The depicted improvement though is not as large as might be expected, since in the test underlying Fig. 14, Broadcast Hash Join operation handles relatively small parts of data and its duration doesn't greatly influence the result. To show the benefits of the invention more precisely, a speedup comparison of a single Broadcast Hash Join operation is shown in Fig. 15. Here, on the x-axis a number of nodes is depicted, while on the y-axis a speedup-factor is shown.

It is also interesting to analyze a scalability problem. Therefore, it is analyzed, what the dependency between execution time reduction and a number of nodes in the cluster is. This is shown in Fig. 16. There, a processing time in % versus the baseline TCP without broadcasting is shown on the y-axis, while the number of nodes in shown on the x-axis.

The main conclusion is that with 32 nodes, the use of Broadcast Hash Join, even decreases the performance in comparison to the standard approach. When using the inventive approach though, still a performance gain is shown.

As further alternatives, more than the previously presented communication patterns and the corresponding transport layer protocols can be employed. Different transport layer solutions may aim at some special use cases or features, for example, possible solution for an All-to-All communication pattern may provide a transport protocol which provides fair communications between all nodes.

The proposed approach may be applied to other distributed calculations with a different set of physical operators, for example reductions and prefix scans where for example MPI transport layer can be used.

In the following, some more details regarding the implementation of an embodiment of the computer program according to the third aspect of the invention are given. The software framework is named flint. Flint is a framework for distributed SQL query execution which allows executing of physical query plan represented in C++. This query execution plan, written in C++ can be assumed as an output of Code Generation phase. Below is an example of code for the query Q3 from TPC-H benchmark set. It is shown how the proposed method of code generation may be implemented. The highest level of the Flint Q3 query - the physical plan - has the following

representation:

1 Dataset < Revenue > * query ()

2 { return

Dataset(Lineitem)->

filter<lineitemFilter>(o)->

project<LineitemProjection, projectLineitem>()->

marker(ALL_TO_ALL_PATTERN_ID, lineitemDatasetSize)- > hashJoin<OrdersProjection,long,lineitemOrderKey,orderKey& gt;(

Dataset(Orders)->

filter<orderFilter>()->

project<OrdersProjection, projectOrders>()->

marker(ALL_TO_ALL_PATTER _ID,

ordersDatasetSize) ,

ordersDatasetSize, "l_orderkey", "o_orderkey")-> broadcastHashJoin<CustomerProjection,int,orderCustomerKey ,customerKey >(

Dataset(Customer)->

filter<customerFilter>(2)->

project<CustomerProjection,projectCustomer>(), marker(MULTICAST_PATTERN_ID,

customerDatasetSize) ,

customerDatasetSize, "o_custkey", "c_custkey")-> marker (MANY_TO_ONE_PATTERN_ID, o /* Special value - means that there

is no size estimation */)->

mapReduce<GroupBy, double, map, sum>(ioooooo)->

project< Revenue, revenue>()->

marker(MANY_TO_ONE_PATTER _ID , o)->

top<byRevenueAndOrderDate>(io);

} The data markers that were added in the previous step are translated to special operator - lines 7, 12, 18, 24 in the previous listing - which appends Communication Pattern ID and total data size estimation to the data to be exchanged.

The basis of query implementation presented in the previous listing is a Dataset base class:

1 // Abstract Dataset

2 template < class T>

3 class Dataset

4 {

5 public:

6 virtual bool next(T& record) = 0;

7

8

9 * Default service information structure associated with data

10 7

11 Servicelnfo defaultServicelnfo = {PEER_TO_PEER_PATTERN_ID,

12 o /* Special value - means that there

13 is no size estimation */};

14

15

16 * Returns a pointer to the service information associated with Dataset

17 7

18 virtual Servicelnfo *getServiceInfo() {return &defaultServiceInfo;}

19

20

21

22

23 Associates service information structure with data

24 7

25 Dataset<T>* marker(PatternId id, size_t dataSizeWithinPattern); 26 27 /**

28 * Replicate data between all nodes.

29 * Broadcast local Dataset data to all nodes and gather data from all nodes.

30 * As a result all nodes get the same replicas of input data

31 */

32 Dataset<T>* replicateO;

33

34 /**

35 * Left join two Datasets

36 */

37 template < class I,

38 class K,

39 void (*outerKey)(K& key, T const& outer),

40 void (*innerKey)(K& key, I const& inner) >

41 Dataset< Join<T,I> >* broadcastHashJoin(Dataset<I>* with,

42 size_t estimation,

43 char const* outerKeyName,

44 char const* innerKeyName);

45

46

47 }; where methods marker() has the following implementation:

48 // Implementation of marker() method

49 template< class T>

50 inline Dataset<T>* Dataset<T>::marker(PatternId id, size_t size) {

51 return new MarkedDataset<T>(this, id, size);

52 }

So when a method marker() is called it creates an instance of MarkedDataset class and returns a pointer to it. The class MarkedDataset has the following definition:

53 // Introduced Marked Dataset 54 template < class T>

55 class MarkedDataset : public Dataset<T>

56 {

57 public:

58 bool next(T& record) override {return in->next(record);}

59

60 Servicelnfo *getServiceInfo() override {return servicelnfo;}

. 61

62 MarkedDataset(Dataset<T> *input, Patternld id, size_t size) : in(input) {

63 servicelnfo = new Servicelnfo;

64 servicelnfo- >id = id;

65 servicelnfo- >size = size;

66 }

67 ~MarkedDataset() {

68 delete servicelnfo;

69 }

70 private:

71 Dataset<T> *in;

72 Servicelnfo *serviceInfo;

73 };

Overridden method next() does nothing just conveying record pointer to the method next() of the input Dataset. The most important is an overloading of the

getServicelnfoO method which now returns a pointer to the service information provided to the marker() method.

Newly created service information may be used within the replication phase of Broadcast Hash Join. Below an implementation of broadcastHash Join() method of Dataset class is presented:

74 // Implementation of join () method

75 template<class T>

76 template < class I,

77 class K, 78 void (*outerKey)(K& key, T const& outer) ,

79 void (*innerKey)(K& key, I const& inner) >

80 Dataset< Join<T,I> >* Dataset<T>::broadcastHashJoin(Dataset<I>* with,

81 size_t estimation,

82 char const* outerKeyName,

83 char const* innerKeyName) {

84 • » *

85 return new BroadcastHashJoinDataset<T,I,K,outerKey,innerKey>(this , 86 with,

87 estimation,

88 outerKeyName,

89 innerKeyName);

90 }

As it was for marker() method the broadcastHashJoinO method also creates instance of BroadcastHashJoin class and returns a pointer to it.

91 //

92 // Join two Datasets using hash table

93 //

94 template<class 0,

95 class I,

96 class K,

97 void (*outerKey)(K& key, 0 const& outer),

98 void (*innerKey)(K& key, I const& inner) >

99 class BroadcastHashJoinDataset : public Dataset< Join<0,I> >

100 {

101 public:

102 BroadcastHashJoinDataset(Dataset<0>* outerDataset,

103 Dataset<I>* innerDataset,

104 size_t estimation,

ios char const* outerKeyName,

106 char const* innerKeyName) { 107

108 hashLoader = new Thread(new BuildHashTableJob<I,K,innerKey>(

109 innerDataset->replicate(),

no table,

111 &allocator));

112

113 }

114 };

During construction of a BroadcastHashJoin class a hash table is created on the basis of an inner table which is replicated to each node within the cluster - line 19 of the previous listing.

Below is an implementation of replicate() method and ReplicateDataset class:

: /* Create a separate 133 {}

134...

135};

Replication is implemented through BroadcastJob - line 17 of the previous listing- which prepares and sends specified dataset:

!37// Replicate Dataset replicates data to all nodes i39template< class T>

i40class BroadcastJob : public Job

141 {

142...

144{

145 ···

146 Communicator* communicator = Communicator: instance;

147 Servicelnfo *serviceInfo = input- >getServiceInfo(); /* Get

148 information */

149

150 while (input- >next(record)) {

151 if (size + sizeof(T) > bufferSize) {

152 buffer->size = size;

153

154 /* And convey this info to the communicator */

155 communicator- > sendBroadcast(buffer , service

156

157 buffer = Buffer: :create(queue->qid, bufferSize);

158 size = o;

159

160

161 t packed = pack(record, buffer->data + size); 162 size += packed;

163 assert(size <= bufferSize);

164 }

165 ...

166}

167}

Finally the Service information associated with the MarkedDataset is gathered - line 12 of the previous listing and conveyed - line 20 of the previous listing - to the

Communicator along with the data to be sent.

The same approach is used for other communication patterns. For example, a

ScatterJob which implements shuffle procedure used by Hash Join and Shuffle Join physical plan operators:

169// Scatter RDD data between nodes using provided distribution key and hash function

171 template < class T, class K, void (*dist_key)(K& key, T const& record) >

172 class ScatterJob : public Job

173 {

174...

175 void run()

176 {

177 ...

178 Communicator* communicator = Communicator: tinstance;

179 Servicelnfo *serviceInfo = input- >getServiceInfo(); /* Get service

180 information */

181 ...

182

183 for (size_t i = o; i < nNodes; i++) {

184 buffers[i] = Buffer: :create(queue->qid, bufferSize);

185 buffers [i]-> size = o; 186 }

187

188 while (input- >next(record)) {

189 dist_key(key, record);

190 size_t hash = hashCode(key);

191 size_t node = hash % nNodes;

192

193 if (buffers[node]->size + sizeof(T) > bufferSize) {

194 communicator- >sendUnicast(buffers [node], node, servicelnfo);

195 buffers[node] = Buffer: :create(queue->qid, bufferSize);

196 buffers[node]->size = o;

197 }

198 size_t size = pack( record, buffers [node] -> data + buffers [node] -> size);

199 assert(size <= sizeof(T));

200 buffers [node] -> size += size;

201 }

202

203 for (size_t node = o; node < nNodes; node++) {

204 if (buffers [node] -> size != 0) {

205 sent += buffers [node] -> size;

206 communicator- >sendUnicast(buffers [node], node, servicelnfo);

207 communicator->sendUnicast(Buffer::eof(queue->qid), node, servicelnfo);

208 } else {

209 buffers[node]->kind = MSGJEOF;

210 communicator- >sendUnicast(buffers [node], node, servicelnfo);

211 }

212 }

213 ... The invention is not limited to the examples shown above. The characteristics of the exemplary embodiments can be used in any advantageous combination.

The invention has been described in conjunction with various embodiments herein. However, other variations to the disclosed embodiments can be understood and effected by those skilled in the art in practicing the claimed invention, from a study of the drawings, the disclosure and the appended claims. In the claims, the word

"comprising " does not exclude other elements or steps and the indefinite article "a" or "an" does not exclude a plurality. A single processor or other unit may fulfill the functions of several items recited in the claims. The mere fact that certain measures are recited in usually different dependent claims does not indicate that a combination of these measures cannot be used to advantage. A computer program may be stored/distributed on a suitable medium, such as an optical storage medium or a solid-state medium supplied together with or as part of other hardware, but may also be distributed in other forms, such as via the internet or other wired or wireless communication systems.