Login| Sign Up| Help| Contact|

Patent Searching and Data


Title:
GRAPH ANALYTICS SYSTEM
Document Type and Number:
WIPO Patent Application WO/2017/131794
Kind Code:
A1
Abstract:
Examples of graph analytics are described. In one example, an apparatus for graph analytics has multiple processing contexts and shared memory for storing graph data. The apparatus also includes a graph insertion interface to insert new graph data into the shared memory and a graph query interface for processing a graph query. A message passing system communicates messages between the multiple processing contexts, and the graph insertion interface and the graph query interface are arranged to allocate tasks to the multiple processing contexts and distribute the allocated tasks via the message passing system. The message passing system may be used to send task stealing request from a first processing context to a second processing context.

Inventors:
SCHEIN SAGI (IL)
NOR IGOR (IL)
BARKOL OMER (IL)
HAYUN EYAL (IL)
Application Number:
PCT/US2016/015863
Publication Date:
August 03, 2017
Filing Date:
January 31, 2016
Export Citation:
Click for automatic bibliography generation   Help
Assignee:
HEWLETT PACKARD ENTPR DEV COMPANY LP (US)
International Classes:
G06F17/30; G06F17/00
Foreign References:
US20150026158A12015-01-22
US20060195508A12006-08-31
US20140019490A12014-01-16
US20150112986A12015-04-23
Other References:
UMUT A. ACAR ET AL.: "Scheduling Parallel Programs by Work Stealing with Private Deques", IN: THE 19TH ACM SIGPLAN SYMPOSIUM ON PRINCIPLES AND PRACTICE OF PARALLEL PROGRAMMING, PPOPP '13, 23 February 2013 (2013-02-23), XP058030576, Retrieved from the Internet
Attorney, Agent or Firm:
DE MERLIER, Mallary K. et al. (US)
Download PDF:
Claims:
CLAIMS

What is claimed is:

1 . System for graph analytics comprising:

a distributed processing system for performing processing operations using multiple processing contexts;

shared memory for storing graph data, the shared memory being accessible by the multiple processing contexts;

a graph insertion interface for insertion of graph data in the shared memory; a graph query interface for processing a graph query; and

a message passing system for communication of messages between the multiple processing contexts,

wherein the graph insertion interface and the graph query interface are arranged to allocate tasks to the multiple processing contexts and distribute the allocated tasks via the message passing system.

2. The apparatus according to claim 1 , wherein a first processing context is arranged to request a task from a second processing context via the message passing system.

3. The apparatus of claim 2, wherein the first processing context is arranged to send the request in response to detection that the first processing context has no outstanding tasks.

4. The apparatus according to claim 2, wherein the first processing context is arranged to select the second processing context from the multiple processing contexts using a pseudo-random process.

5. The apparatus according to claim 2, wherein the first processing context is arranged to determine the second processing context from the multiple processing contexts in accordance with statistical data for task distribution between the multiple processing contexts.

6. The apparatus according to claim 5, further comprising a task monitor to track distribution of tasks between the multiple processing contexts,

wherein the first processing context is arranged to consult the task monitor via the message passing system to determine the second processing context.

7. The apparatus according to claim 2, wherein the second processing context is arranged to give the request for a task from the first processing context a higher priority than all other task requests.

8. The apparatus according to claim 1 , wherein the shared memory comprises non-volatile memory.

9. A method of processing graph data stored in shared memory by a plurality of processing contexts having access to the shared memory, the method comprising by a first processing context:

receiving_at least one task relating to a processing request for processing graph data stored in the shared memory;

performing the at least one task using a portion of the graph data; and in the event of the first processing context having a shortage of tasks, requesting a task from a second processing context.

10. The method of claim 9, wherein the first processing context requests a task from the second processing context via a message passing system.

1 1 . The method of claim 9, wherein the processing request comprises a graph query.

12. The method of claim 1 1 , wherein the graph query is a k-hop query.

13. The method of claim 9, wherein the second processing context prioritizes the request for a task from the first processing context.

14. A non-transitory computer-readable medium storing comprising a set of computer-readable instructions stored thereon, which, when executed by a processing context, cause the processing context to:

receive tasks for processing a graph operation on graph data stored in a memory shared by multiple processing contexts;

monitor the number of tasks to be performed by the processing context; and following performance of the outstanding tasks, request a task from another of the multiple processing contexts.

Description:
GRAPH ANALYTICS SYSTEM

BACKGROUND

[0001] Many situations arise in which time-varying data is produced. For example, social media feeds, system logs, telecommunications systems and network monitoring applications may all generate records of events that occur over time. This time-varying data can be represented in a graph structure.

BRIEF DESCRIPTION OF THE DRAWINGS

[0002] Various features of the present disclosure will be apparent from the detailed description which follows, taken in conjunction with the accompanying drawings, which together illustrate, features of certain examples, and wherein:

[0003] Figure 1 is a schematic illustration of apparatus for graph analytics according to an example;

[0004] Figures 2 is a schematic illustration of a worker node forming part of the apparatus shown in Figure 1 according to an example;

[0005] Figure 3 is a flow chart showing operations performed by the worker node of an example; and

[0006] Figure 4 is a schematic illustration showing a computer device for graph analytics according to an example.

DETAILED DESCRIPTION

[0007] In a social media feed a user may add or remove other users. In a telecommunications system, connectivity between a mobile device and a base station may vary over time. Similarly, repositories of electronic documents, such as the HyperText Markup Language (HTML) documents forming the World Wide Web, change over time as new documents get added and old documents are removed or modified. It is frequently desirable to quickly and efficiently store and/or retrieve such data. [0008] It is frequently desirable to analyze time-varying data, which may be received as streaming data (i.e. a "data stream"). For example, the data may be indicative of connections at given times between entities in a network such as a computer network or a telecommunication system. As another example, the data may be indicative of e-commerce transactions wherein connections between users represent transactions between those users. In certain examples described herein this time-varying or "dynamic" data is represented as a graph in a graph data structure. This then allows the data to be analyzed using graph processing techniques.

[0009] When analyzing time-varying data, it may be desirable to analyze an up- to-date representation of the data and/or a representation of the data at a given time, or over a given time range, in the past. This time-based querying presents challenges for data storage and/or retrieval. Processing an incoming or new event in a data stream may take a given amount of time and computing resources and this may vary depending on the processing methods that are being used. In one case, a complete copy of a graph data structure could be stored every time a change in the data stream occurs. This would take considerable time and resources at the time of storage but would aid rapid retrieval of data. In certain cases it may not be possible to appropriately handle changes if they are received too frequently and/or the data structure becomes too large. In another case, changes may be stored in a log file. This would enable rapid storage of data. However, in this case, a new graph data structure would need to be constructed and processed with each data retrieval operation. As such, time-based queries may take a considerable amount of time to perform and/or fail to complete for large datasets and constrained resources. Given this, there is a desire for a time- and resource-efficient methods for storing and loading time-varying data, in particular where that data may be represented as graph data.

[0010] Certain examples described herein allow for useful processing of time- varying data. Certain examples provide apparatus for graph analytics having a distributed processing system and a shared memory system for storing graph data. The distributed processing system has multiple processing contexts, each processing context being able to perform a processing task independently from the other processing contexts. The apparatus also includes a graph insertion interface to insert new graph data into the shared memory and a graph query interface for processing a graph query. A message passing system communicates messages between the multiple processing contexts, and the graph insertion interface and the graph query interface are arranged to allocate tasks to the multiple processing contexts and distribute the allocated tasks via the message passing system.

[0011] Certain examples provide a method of processing graph data stored in shared memory by a plurality of processing contexts having shared access to the memory. A first processing context receives at least one task relating to a processing request for processing graph data stored in the shared memory and performs the at least one task using a portion of the graph data. In the event of the first processing context having a shortage of tasks, the first processing context can request a task from a second processing context. In this way, load imbalance between the multiple processing contexts may be mitigated.

[0012] Figure 1 schematically shows a system 100 for processing a time- varying graph according to an example. In this example, the distributed system includes a master node 101 and a plurality of worker nodes 103a to 103n, hereafter referred to as worker nodes 103, interconnected by a message passing system 1 05. In an example, the message passing system is the Message Passing Interface (MPI).

[0013] In this example, each of the master node 1 01 and the worker nodes 103 is formed by a system on a chip. As shown in Figure 1 , in this example the plurality of worker nodes 103 have respective core processors 107a to 107n, hereafter referred to as core processors 107. The core processors 107 form part of a distributed processing system, and each core processor 107 implements at least one processing context such that the distributed processing system performs processing operations using multiple processing contexts. Each processing context may execute an associated sequence of tasks on a processor core.

[0014] The message passing system 105 allows for communication of messages between processing contexts of the master node 101 and the worker nodes 103, and also allows for communication of messages directly between the processing contexts of the worker nodes 103. Although schematically shown in Figure 1 as being outside of the master node 101 and the worker nodes 103 for ease of illustration, it will be appreciated that parts of the message passing system 105 may be provided in the systems on a chip for the master node 101 and the worker nodes 103.

[0015] In an example, the worker nodes 103 can access time-varying graph data stored by a shared memory 109 via a memory management module 1 1 1 . The shared memory 109 distributes the graph data between a plurality of non-volatile memories 1 13a to 1 13m, hereafter referred to a NVMs 1 13. The memory management module 1 1 1 provides the worker nodes 103 with shared access to the shared memory 109. Although the memory management module 1 1 1 is schematically shown as being outside the worker nodes 103 and the shared memory 109 for ease of illustration, it will be appreciated that at least part of the memory management module 1 1 1 may be located in the same hardware devices as the worker nodes 103 and the NVMs 1 13.

[0016] The NVMs 1 1 3 may be any form of non-volatile memory. In an example, the NVMs may be resistive memory such as memristor technology.

[0017] In an example, as schematically shown in Figure 1 , the master node 101 has program code for a graph insertion interface 1 15, a graph query interface 1 17, a task dispatcher 1 19, a graph composer 121 and a task monitor 123.

[0018] The graph insertion interface 1 15 receives data from a data source 125, for example a telecommunications network or a social network, and inserts the data as new vertices or edges in the time-varying graph stored by the shared memory 109. In an example, this is achieved by the graph insertion interface 1 1 5 allocating tasks for achieving the data insertion to the plurality of worker nodes 103. This allocation may be made on the basis that each worker node is responsible for a respective portion of the time-varying graph, for example the portion of the time varying graph stored on one of the NVMs 1 13. The tasks are then sent by the task dispatcher 1 19, via the message passing system 105, to their respective worker nodes 103. [0019] The graph query interface 1 17 receives and processes a query from a query application 127, which may be for example a web application. An example of a query that may be received is a "k-hop" query for a prescribed time interval, which requests details of all paths from a query vertex, or a query set of vertices, of the time-varying graph having a depth of up to k edges over the prescribed time interval. The graph query interface 1 17 allocates tasks for implementing the query to the plurality of worker nodes 103, and then the allocated tasks are sent by the task dispatcher 1 19 to their respective worker nodes via the message passing system 105. Reply data for the query is then received from the worker nodes 103 via the message passing system 105, and collated in the graph composer 121 . The collated data is then processed by the graph query interface 1 17, which may identify new tasks to be allocated and generate answer data for sending to the query application 127.

[0020] Figure 2 shows an example of a worker node 203 according to an example. The worker node 203 has a message passing system 231 , a task queue 233, a vertex processor 235, vertex logic for insert and select functions 237, a memory interface 239 and a task dispatcher 241 . The task queue 233 stores outstanding tasks for the worker node 203. In an example, the task queue 233 and the vertex logic 237is implemented in software. Alternatively the task queue 233 and the vertex logic 237 could be implemented in software or a combination of software and hardware. The vertex processor 235 performs operations on stored graph data in performance of queued tasks. The nature of these operations is determined in accordance with the vertex logic 237, which specifies separate operations for an insert function and a select function. In an example, the vertex processor 235 is implemented by a processing context, in the form of software executed on a processor core 103.

[0021] The message passing system 231 , the memory interface 239 and the task dispatcher are, in an example, implemented by a combination of software and hardware.

[0022] Figure 3 illustrates a flowchart showing an example of a method 300 for processing task requests. Although the execution of the method is described below with reference to the system of Figure 2, the components for executing the method 300 may be spread among multiple devices. The method 300 may be implemented in the form of executable instructions stored on a machine-readable storage medium, and/or in the form of electrical circuitry.

[0023] As shown in Figure 3, following receipt of at least one task relating to a processing request by the message passing system 231 , at S1 , a task in the task queue 233 is performed, at S3, by the vertex processor 235 using the vertex logic 237. For an insert function, the vertex logic 237 causes a new edge and/or a new vertex to be added to the time-varying graph stored in the shared memory 109. For a select function, the vertex logic causes data identifying edges connecting with a subject vertex to be retrieved from the time-varying graph and causes dispatch of a task including the retrieved data by the task dispatcher 241 to the master node 101 via the message passing system 231 . For the example of a K-hop query, the worker node 203 performs a local breadth first search up to a depth of K hops, and the set of resultant edges is returned to the worker node 203 which passes the set of resultant edges to the master node 101 for further processing.

[0024] In an example, the worker node 203 then checks, at S5, for a shortage of tasks in the task queue 233. For example, the worker node 203 may determine if there are no tasks outstanding in the task queue 233. If there are tasks in the task queue 233, then the worker node 203 performs the next task in the task queue. If there are no tasks in the task queue 233, however, the worker node 203 sends, at S7, a request for a task, hereafter called a task stealing request, directly to another worker node 203 via the message passing system. As such, a task stealing request is a request from a first processing context to a second processing context that the second processing context sends a task in its task queue 203 to the first processing context for execution.

[0025] There are various techniques that can be used to determine the worker node 203 that is sent the task stealing request. In an example, the recipient of the task stealing request is determined by a pseudo-random process that, in effect, identifies the recipient in a manner that is not dependent on work loads of the processing contexts or any form of deterministic listing. Alternatively, the worker node 203 can obtain statistical data about the distribution of tasks between all or a subset of the worker nodes from a task monitor, such as the task monitor 1 23 of the master node 101 , which tracks the distribution of tasks between multiple processing contexts. In an example, a first processing context is arranged to determine the second processing context in accordance with this statistical data for task distribution between the multiple processing contexts. In another example, the task monitor 123 tracks the distribution of tasks between the processing contexts, and the first processing context consults the task manager via the message passing system to determine the second processing context.

[0026] In an example, a factor that may be taken into account when determining whether to send a task stealing request is the extent to which graph data would need to be transferred from the distributed memory 109 to local memory at the worker node 203 in order to perform the task.

[0027] The ability to steal tasks from other worker nodes can address an imbalance in the distribution of tasks. Such an imbalance can occur as a result of local concentration of tasks due to the specific distribution of vertices in the time varying graph or the result of a large variety of degrees of vertices in the graph.

[0028] On receipt of a task stealing request, a worker node gives the task stealing request a higher priority than all other pending requests so that the task stealing request is the next task processed. The worker node then determines whether to comply with the task stealing request by sending a pending task to the worker node that sent the task stealing request, or to disregard the task stealing request.

[0029] The above-described techniques can be applied to many different types of time-varying data stored in the form of graph data, that is stored as a list of vertices with each vertex being associated with a list of time-dependent edges indicating connections to other vertices and the times at which the connections occur. Time- varying data may relate to an event stream, e.g. a series of events wherein each event involves two or more entities and occurs at a defined time. The entities may be computers in a network, and the at least one relationship may indicate a connection between these computers, e.g. as extracted by a packet sniffing tool. As another example, the entities may comprise elements of a telecommunications network, such as mobile devices and base stations or core network elements, with the at least one relationship similarly representing a connection between these elements. As a further example, the entities may represent users in a computing application, e.g. as identified by a user account having an appropriate user identifier. The relationship in this case may comprise adding a particular second user to a user list of a first user. In another case, the relationship may comprise a hyperlink and the entities may comprise HTML documents on the World Wide Web. It will be clear to one skilled in the art that the presently described method is applicable to other forms of data indicating relationships between entities.

[0030] The time-varying data may be stored in an augmented form of an adjacency list representation of a time-varying graph, based on the time-varying data. Vertices in the adjacency list representation have corresponding edges in the time varying data and each vertex has a corresponding neighbor list. The neighbor list indicates vertices with which connections occur to the vertex in question at specific times. In certain cases, the adjacency list representation may comprise a plurality of array data structures. For example, a first array data structure may store a list of vertices and each entry in the list of vertices may have a corresponding linked array data structure implementing a neighbor list. These array structure may be dynamic, e.g. they may change in size over time.

[0031] A distinct chain representation may be stored for each entry in each neighbor list. The distinct chain representation indicates distinct elements of a corresponding neighbor list. As such, if a given element is repeated in a neighbor list, the distinct chain representation would indicate one instance of that element.

The distinct chain representation may be implemented as a hash table.

[0032] Certain examples described above enable events from a data stream

(time-varying data) to be efficiently stored. If data is stored as described above, efficient loading or retrieval of data is enabled. An example of retrieving a portion of data corresponding to a particular time interval will now be described.

[0033] Given an adjacency list representation, a retrieval operation begins by searching vertices of the vertex list to locate any vertex with an associated relationship time within the time interval. This may for example be a binary search. A second search, which may also be a binary search, is then performed within the neighbor list of the located vertex, to find a vertex with an associated relationship time within the time interval. This may for example correspond to the first or last relationship in the neighbor list within the time interval. Elements of the adjacency list representation may then be retrieved by starting at the determined element and traversing, based on the neighbor pointers, elements within neighbor lists that have an associated relationship time within the time interval. This may for example be a breadth-first search. This retrieval operation retrieves vertices and edges or connections from the graph data structure that are within the queried time interval.

[0034] When retrieving graph data for a requested time or time interval, the distinct chain representation is used to determine all vertices that have connections that occur within that time interval. In this case, it is deemed sufficient to know that at least one connection occurred between two vertices in a requested time interval, without returning further information regarding the nature of any connections within the time interval. By returning information regarding entities active within a time period, but reducing the amount of information regarding specific connections within that time period, graph retrieval operations may be increased in speed. In this case, a distinct chain representation is stored for each entry in each neighbor list. The distinct chain representation indicates distinct elements of its corresponding neighbor list.

[0035] Certain system components and methods described herein may be implemented by way of non-transitory computer program code that is storable on a non-transitory storage medium. Figure 4 shows an example of a system comprising at least one processor 401 arranged to retrieve data from a computer readable storage medium 403. The computer-readable storage medium 403 comprises a set of computer-readable instructions stored thereon. The set of computer readable instructions are arranged to cause the at least one processing context to perform a series of actions. Instructions 405 are arranged to receive tasks for processing a graph operation on graph data stored in a distributed memory shared by a plurality of processor contexts. Instructions 407 are arranged to monitor the number of outstanding tasks to be performed by the processor context. Instructions 409 are arranged to request a task from another of the plurality of processor contexts following performance of the received tasks.

[0036] The non-transitory storage medium can be any media that can contain, store, or maintain programs and data for use by or in connection with an instruction execution system. Machine-readable media can comprise any one of many physical media such as, for example, electronic, magnetic, optical, electromagnetic, or semiconductor media. More specific examples of suitable machine-readable media include, but are not limited to, a hard drive, a random access memory (RAM), a readonly memory (ROM), an erasable programmable read-only memory, or a portable disc.

[0037] The preceding description has been presented to illustrate and describe examples of the principles described. This description is not intended to be exhaustive or to limit these principles to any precise form disclosed. Many modifications and variations are possible in light of the above teaching. It is to be understood that any feature described in relation to any one example may be used alone, or in combination with other features described, and may also be used in combination with any features of any other of the examples, or any combination of any other of the examples.