/*
 * libopensync - A synchronization framework
 * Copyright (C) 2004-2005  Armin Bauer <armin.bauer@opensync.org>
 * 
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 * 
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 * 
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
 * 
 */

#include "opensync.h"
#include "opensync_internals.h"

#include "opensync-context.h"
#include "opensync-plugin.h"
#include "opensync-ipc.h"
#include "opensync-serializer.h"
#include "opensync-format.h"
#include "opensync-client.h"
#include "opensync_client_internals.h"
#include "opensync-version.h"
#include "opensync-merger.h"

typedef struct callContext {
	OSyncClient *client;
	OSyncMessage *message;
	OSyncChange *change;
} callContext;

static OSyncContext *_create_context(OSyncClient *client, OSyncMessage *message, OSyncContextCallbackFn callback, OSyncChange *change, OSyncError **error)
{
	OSyncContext *context = osync_context_new(error);
	if (!context)
		goto error;
	
	callContext *baton = osync_try_malloc0(sizeof(callContext), error);
	if (!baton)
		goto error_free_context;
	
	baton->client = client;
	osync_client_ref(baton->client);
	
	baton->message = message;
	osync_message_ref(message);
	
	baton->change = change;
	if (baton->change)
		osync_change_ref(baton->change);
		
	osync_context_set_callback(context, callback, baton);
	return context;
	
error_free_context:
	osync_context_unref(context);
error:
	return FALSE;
}

static void _free_baton(callContext *baton)
{
	osync_client_unref(baton->client);
	osync_message_unref(baton->message);
	
	if (baton->change)
		osync_change_unref(baton->change);
	
	g_free(baton);
}

static void _osync_client_connect_callback(void *data, OSyncError *error)
{
	osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, data, error);
	OSyncError *locerror = NULL;
	callContext *baton = data;

	OSyncMessage *message = baton->message;
	OSyncClient *client = baton->client;

	OSyncMessage *reply = NULL;
	if (!osync_error_is_set(&error)) {
		reply = osync_message_new_reply(message, &locerror);
		//Send connect specific reply data
	} else {
		reply = osync_message_new_errorreply(message, error, &locerror);
	}
	if (!reply)
		goto error;
	osync_trace(TRACE_INTERNAL, "Reply id %lli", osync_message_get_id(reply));

	_free_baton(baton);
	
	if (!osync_queue_send_message(client->outgoing, NULL, reply, &locerror))
		goto error_free_message;
	
	osync_message_unref(reply);

	osync_trace(TRACE_EXIT, "%s", __func__);
	return;
	
error_free_message:
	osync_message_unref(reply);
error:
	_free_baton(baton);
	osync_client_error_shutdown(client, locerror);
	osync_error_unref(&locerror);
	osync_trace(TRACE_EXIT, "%s", __func__);
	return;
}

static void _osync_client_disconnect_callback(void *data, OSyncError *error)
{
	osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, data, error);
	OSyncError *locerror = NULL;
	callContext *baton = data;

	OSyncMessage *message = baton->message;
	OSyncClient *client = baton->client;

	OSyncMessage *reply = NULL;
	if (!osync_error_is_set(&error)) {
		reply = osync_message_new_reply(message, &locerror);
	} else {
		reply = osync_message_new_errorreply(message, error, &locerror);
	}
	if (!reply)
		goto error;
	osync_trace(TRACE_INTERNAL, "Reply id %lli", osync_message_get_id(reply));

	_free_baton(baton);
	
	if (!osync_queue_send_message(client->outgoing, NULL, reply, &locerror))
		goto error_free_message;
	
	osync_message_unref(reply);

	osync_trace(TRACE_EXIT, "%s", __func__);
	return;
	
error_free_message:
	osync_message_unref(reply);
error:
	_free_baton(baton);
	osync_client_error_shutdown(client, locerror);
	osync_error_unref(&locerror);
	osync_trace(TRACE_EXIT, "%s", __func__);
	return;
}

static void _osync_client_get_changes_callback(void *data, OSyncError *error)
{
	osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, data, error);
	OSyncError *locerror = NULL;
	callContext *baton = data;

	OSyncMessage *message = baton->message;
	OSyncClient *client = baton->client;

	OSyncMessage *reply = NULL;
	if (!osync_error_is_set(&error)) {
		reply = osync_message_new_reply(message, &locerror);
		//Send get_changes specific reply data
	} else {
		reply = osync_message_new_errorreply(message, error, &locerror);
	}
	if (!reply)
		goto error;
	osync_trace(TRACE_INTERNAL, "Reply id %lli", osync_message_get_id(reply));

	_free_baton(baton);
	
	if (!osync_queue_send_message(client->outgoing, NULL, reply, &locerror))
		goto error_free_message;
	
	osync_message_unref(reply);

	osync_trace(TRACE_EXIT, "%s", __func__);
	return;
	
error_free_message:
	osync_message_unref(reply);
error:
	_free_baton(baton);
	osync_client_error_shutdown(client, locerror);
	osync_error_unref(&locerror);
	osync_trace(TRACE_EXIT, "%s", __func__);
	return;
}

static void _osync_client_change_callback(OSyncChange *change, void *data)
{
	callContext *baton = data;
	OSyncError *locerror = NULL;
	osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, change, data);
	
	OSyncClient *client = baton->client;
	OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_NEW_CHANGE, 0, &locerror);
	if (!message)
		goto error;

	if (!osync_marshal_change(message, change, &locerror))
		goto error_free_message;

	if (!osync_queue_send_message(client->outgoing, NULL, message, &locerror))
		goto error_free_message;
	
	osync_message_unref(message);
	
	osync_trace(TRACE_EXIT, "%s", __func__);
	return;
	
error_free_message:
	osync_message_unref(message);
error:
	_free_baton(baton);
	osync_client_error_shutdown(client, locerror);
	osync_error_unref(&locerror);
	osync_trace(TRACE_EXIT, "%s", __func__);
	return;
}

static void _osync_client_ignored_conflict_callback(OSyncChange *change, void *data)
{
	callContext *baton = data;
	OSyncError *locerror = NULL;
	osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, change, data);
	
	OSyncClient *client = baton->client;
	OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_READ_CHANGE, 0, &locerror);
	if (!message)
		goto error;

	if (!osync_marshal_change(message, change, &locerror))
		goto error_free_message;

	if (!osync_queue_send_message(client->outgoing, NULL, message, &locerror))
		goto error_free_message;
	
	osync_message_unref(message);
	
	osync_trace(TRACE_EXIT, "%s", __func__);
	return;
	
error_free_message:
	osync_message_unref(message);
error:
	_free_baton(baton);
	osync_client_error_shutdown(client, locerror);
	osync_error_unref(&locerror);
	osync_trace(TRACE_EXIT, "%s", __func__);
	return;
}

static void _osync_client_read_callback(void *data, OSyncError *error)
{
	osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, data, error);
	OSyncError *locerror = NULL;
	callContext *baton = data;

	OSyncMessage *message = baton->message;
	OSyncClient *client = baton->client;

	osync_trace(TRACE_INTERNAL, "ignored chnaged: %p", baton->change);

	OSyncMessage *reply = NULL;
	if (!osync_error_is_set(&error)) {
		reply = osync_message_new_reply(message, &locerror);
		//Send get_changes specific reply data
		osync_message_write_string(reply, osync_change_get_uid(baton->change));
	} else {
		reply = osync_message_new_errorreply(message, error, &locerror);
	}
	if (!reply)
		goto error;
	osync_trace(TRACE_INTERNAL, "Reply id %lli", osync_message_get_id(reply));


	if (!osync_queue_send_message(client->outgoing, NULL, reply, &locerror))
		goto error_free_message;

	_osync_client_ignored_conflict_callback(baton->change, baton);

	_free_baton(baton);

	osync_message_unref(reply);

	osync_trace(TRACE_EXIT, "%s", __func__);
	return;
	
error_free_message:
	osync_message_unref(reply);
error:
	_free_baton(baton);
	osync_client_error_shutdown(client, locerror);
	osync_error_unref(&locerror);
	osync_trace(TRACE_EXIT, "%s", __func__);
	return;
}

static void _osync_client_commit_change_callback(void *data, OSyncError *error)
{
	osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, data, error);
	OSyncError *locerror = NULL;
	callContext *baton = data;

	OSyncMessage *message = baton->message;
	OSyncClient *client = baton->client;

	OSyncMessage *reply = NULL;
	if (!osync_error_is_set(&error)) {
		reply = osync_message_new_reply(message, &locerror);
		//Send get_changes specific reply data
		osync_message_write_string(reply, osync_change_get_uid(baton->change));
	} else {
		reply = osync_message_new_errorreply(message, error, &locerror);
	}
	if (!reply)
		goto error;
	osync_trace(TRACE_INTERNAL, "Reply id %lli", osync_message_get_id(reply));

	_free_baton(baton);
	
	if (!osync_queue_send_message(client->outgoing, NULL, reply, &locerror))
		goto error_free_message;
	
	osync_message_unref(reply);

	osync_trace(TRACE_EXIT, "%s", __func__);
	return;
	
error_free_message:
	osync_message_unref(reply);
error:
	_free_baton(baton);
	osync_client_error_shutdown(client, locerror);
	osync_error_unref(&locerror);
	osync_trace(TRACE_EXIT, "%s", __func__);
	return;
}

static void _osync_client_committed_all_callback(void *data, OSyncError *error)
{
	osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, data, error);
	OSyncError *locerror = NULL;
	callContext *baton = data;

	OSyncMessage *message = baton->message;
	OSyncClient *client = baton->client;

	OSyncMessage *reply = NULL;
	if (!osync_error_is_set(&error)) {
		reply = osync_message_new_reply(message, &locerror);
		//Send get_changes specific reply data
	} else {
		reply = osync_message_new_errorreply(message, error, &locerror);
	}
	if (!reply)
		goto error;
	osync_trace(TRACE_INTERNAL, "Reply id %lli", osync_message_get_id(reply));

	_free_baton(baton);
	
	if (!osync_queue_send_message(client->outgoing, NULL, reply, &locerror))
		goto error_free_message;
	
	osync_message_unref(reply);

	osync_trace(TRACE_EXIT, "%s", __func__);
	return;
	
error_free_message:
	osync_message_unref(reply);
error:
	_free_baton(baton);
	osync_client_error_shutdown(client, locerror);
	osync_error_unref(&locerror);
	osync_trace(TRACE_EXIT, "%s", __func__);
	return;
}

static void _osync_client_sync_done_callback(void *data, OSyncError *error)
{
	osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, data, error);
	OSyncError *locerror = NULL;
	callContext *baton = data;

	OSyncMessage *message = baton->message;
	OSyncClient *client = baton->client;

	OSyncMessage *reply = NULL;
	if (!osync_error_is_set(&error)) {
		reply = osync_message_new_reply(message, &locerror);
		//Send get_changes specific reply data
	} else {
		reply = osync_message_new_errorreply(message, error, &locerror);
	}
	if (!reply)
		goto error;
	osync_trace(TRACE_INTERNAL, "Reply id %lli", osync_message_get_id(reply));

	_free_baton(baton);
	
	if (!osync_queue_send_message(client->outgoing, NULL, reply, &locerror))
		goto error_free_message;
	
	osync_message_unref(reply);

	osync_trace(TRACE_EXIT, "%s", __func__);
	return;
	
error_free_message:
	osync_message_unref(reply);
error:
	_free_baton(baton);
	osync_client_error_shutdown(client, locerror);
	osync_error_unref(&locerror);
	osync_trace(TRACE_EXIT, "%s", __func__);
	return;
}

static osync_bool _osync_client_handle_initialize(OSyncClient *client, OSyncMessage *message, OSyncError **error)
{
	OSyncMessage *reply = NULL;
	char *enginepipe = NULL;
	char *pluginname = NULL;
	char *plugindir = NULL;
	char *groupname = NULL;
	char *configdir = NULL;
	char *formatdir = NULL;
	char *config = NULL;
	OSyncQueue *outgoing = NULL;
	
	osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, message, error);
	
	osync_message_read_string(message, &enginepipe);
	osync_message_read_string(message, &formatdir);
	osync_message_read_string(message, &plugindir);
	osync_message_read_string(message, &pluginname);
	osync_message_read_string(message, &groupname);
	osync_message_read_string(message, &configdir);
	osync_message_read_string(message, &config);
	
	osync_trace(TRACE_INTERNAL, "enginepipe %s, formatdir %s, plugindir %s, pluginname %s", enginepipe, formatdir, plugindir, pluginname);
		
	/* First we connect the engine pipe if necessary*/
	if (enginepipe) {
		outgoing = osync_queue_new(enginepipe, error);
		if (!outgoing)
			goto error;
		
		osync_trace(TRACE_INTERNAL, "connecting to engine");
		
		if (!osync_queue_connect(outgoing, OSYNC_QUEUE_SENDER, error)) {
			osync_queue_free(outgoing);
			goto error;
		}
		
		osync_client_set_outgoing_queue(client, outgoing);
		osync_trace(TRACE_INTERNAL, "done connecting to engine");
	}
	
	if (!client->plugin) {
		client->plugin_env = osync_plugin_env_new(error);
		if (!client->plugin_env)
			goto error;
		
		if (!osync_plugin_env_load(client->plugin_env, plugindir, error))
			goto error;
	
		client->plugin = osync_plugin_env_find_plugin(client->plugin_env, pluginname);
		if (!client->plugin) {
			osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to find plugin %s", pluginname);
			goto error;
		}
		osync_plugin_ref(client->plugin);
	}
	
	client->format_env = osync_format_env_new(error);
	if (!client->format_env)
		goto error;
	
	if (!osync_format_env_load_plugins(client->format_env, formatdir, error))
		goto error;
	
	client->plugin_info = osync_plugin_info_new(error);
	if (!client->plugin_info)
		goto error;
	
	osync_plugin_info_set_configdir(client->plugin_info, configdir);
	osync_plugin_info_set_config(client->plugin_info, config);
	osync_plugin_info_set_loop(client->plugin_info, client->context);
	osync_plugin_info_set_format_env(client->plugin_info, client->format_env);
	osync_plugin_info_set_groupname(client->plugin_info, groupname);
	
	client->plugin_data = osync_plugin_initialize(client->plugin, client->plugin_info, error);
	if (!client->plugin_data)
		goto error;

	reply = osync_message_new_reply(message, error);
	if (!reply)
		goto error_finalize;

	if (!osync_queue_send_message(client->outgoing, NULL, reply, error))
		goto error_free_message;
	
	osync_message_unref(reply);
		
	g_free(enginepipe);
	g_free(pluginname);
	g_free(configdir);
	g_free(plugindir);
	g_free(groupname);
	g_free(formatdir);
	g_free(config);
	
	osync_trace(TRACE_EXIT, "%s", __func__);
	return TRUE;
	
error_free_message:
	osync_message_unref(reply);
error_finalize:
	osync_plugin_finalize(client->plugin, client->plugin_data);
error:
	g_free(enginepipe);
	g_free(pluginname);
	g_free(configdir);
	g_free(plugindir);
	g_free(groupname);
	g_free(formatdir);
	g_free(config);
	osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
	return FALSE;
}

static osync_bool _osync_client_handle_finalize(OSyncClient *client, OSyncMessage *message, OSyncError **error)
{
	osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, message, error);
	
	if (client->plugin) {
		if (client->plugin_data)
			osync_plugin_finalize(client->plugin, client->plugin_data);
		
		osync_plugin_unref(client->plugin);
		client->plugin = NULL;
	}
	
	if (client->plugin_env) {
		osync_plugin_env_free(client->plugin_env);
		client->plugin_env = NULL;
	}
	
	if (client->plugin_info) {
		osync_plugin_info_unref(client->plugin_info);
		client->plugin_info = NULL;
	}
	
	if (client->format_env) {
		osync_format_env_free(client->format_env);
		client->format_env = NULL;
	}
	
	if (!client->outgoing) {
		osync_error_set(error, OSYNC_ERROR_GENERIC, "No outgoing queue yet");
		goto error;
	}
	
	OSyncMessage *reply = osync_message_new_reply(message, NULL);
	if (!reply)
		goto error;

	if (!osync_queue_send_message(client->outgoing, NULL, reply, NULL))
		goto error_free_message;
	
	osync_message_unref(reply);
		
	osync_trace(TRACE_EXIT, "%s", __func__);
	return TRUE;
	
error_free_message:
	osync_message_unref(reply);
error:
	osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
	return FALSE;
}

static osync_bool _osync_client_handle_discover(OSyncClient *client, OSyncMessage *message, OSyncError **error)
{
	OSyncMessage *reply = NULL;
	int i = 0;
	
	osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, message, error);

	if (!osync_plugin_discover(client->plugin, client->plugin_data, client->plugin_info, error))
		goto error;

	reply = osync_message_new_reply(message, error);
	if (!reply)
		goto error;

	if (osync_plugin_info_get_main_sink(client->plugin_info))
		osync_message_write_int(reply, 1);
	else
		osync_message_write_int(reply, 0);

	int numobjs = osync_plugin_info_num_objtypes(client->plugin_info);
	int avail = 0;
	for (i = 0; i < numobjs; i++) {
		OSyncObjTypeSink *sink = osync_plugin_info_nth_objtype(client->plugin_info, i);
		if (osync_objtype_sink_is_available(sink)) {
			avail++;
		}
	}

	osync_message_write_int(reply, avail);
	
	for (i = 0; i < numobjs; i++) {
		OSyncObjTypeSink *sink = osync_plugin_info_nth_objtype(client->plugin_info, i);
		if (osync_objtype_sink_is_available(sink)) {
			if (!osync_marshal_objtype_sink(reply, sink, error))
				goto error_free_message;
		}
	}

	OSyncVersion *version = osync_plugin_info_get_version(client->plugin_info);
	if (version) {
		osync_message_write_int(reply, 1);
		osync_message_write_string(reply, osync_version_get_plugin(version));
		osync_message_write_string(reply, osync_version_get_priority(version));
		osync_message_write_string(reply, osync_version_get_modelversion(version));
		osync_message_write_string(reply, osync_version_get_firmwareversion(version));
		osync_message_write_string(reply, osync_version_get_softwareversion(version));
		osync_message_write_string(reply, osync_version_get_hardwareversion(version));
		osync_message_write_string(reply, osync_version_get_identifier(version));
	}else
		osync_message_write_int(reply, 0);
	
	OSyncCapabilities *capabilities = osync_plugin_info_get_capabilities(client->plugin_info);
	if (capabilities) {
		char* buffer;
		int size;
		osync_message_write_int(reply, 1);
		if(!osync_capabilities_assemble(capabilities, &buffer, &size))
			goto error_free_message;
		osync_message_write_string(reply, buffer);
		g_free(buffer);
	}else
		osync_message_write_int(reply, 0);

	if (!osync_queue_send_message(client->outgoing, NULL, reply, error))
		goto error_free_message;
	
	osync_message_unref(reply);
	
	osync_trace(TRACE_EXIT, "%s", __func__);
	return TRUE;
	
error_free_message:
	osync_message_unref(reply);
error:
	osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
	return FALSE;
}

static osync_bool _osync_client_handle_connect(OSyncClient *client, OSyncMessage *message, OSyncError **error)
{
	osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, message, error);

	char *objtype = NULL;
	int slowsync;
	OSyncMessage *reply = NULL;
	
	osync_message_read_string(message, &objtype);
	osync_message_read_int(message, &slowsync);
	osync_trace(TRACE_INTERNAL, "Searching sink for %s", objtype);
	
	OSyncObjTypeSink *sink = NULL;
	if (objtype) {
		sink = osync_plugin_info_find_objtype(client->plugin_info, objtype);
		
		if (!sink) {
			osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to find sink for %s", objtype);
			g_free(objtype);
			goto error;
		}
		
		g_free(objtype);
	} else
		sink = osync_plugin_info_get_main_sink(client->plugin_info);
		
	if (!sink) {
		reply = osync_message_new_reply(message, error);
		if (!reply)
			goto error;
		
		if (!osync_queue_send_message(client->outgoing, NULL, reply, error))
			goto error_free_reply;
		
		osync_message_unref(reply);
	} else {
		/* set slowsync.
		   otherwise disable slowsync - to avoid slowsyncs every time with the same initiliazed engine
		   without finalizing the engine the next sync with the same engine would be again a slow-sync.
		   (unittest: sync - testcases: sync_easy_new_del, sync_easy_new_mapping) */
		if (slowsync)
			osync_objtype_sink_set_slowsync(sink, TRUE);
		else
			osync_objtype_sink_set_slowsync(sink, FALSE);

		OSyncContext *context = _create_context(client, message, _osync_client_connect_callback, NULL, error);
		if (!context)
			goto error;
		
		osync_plugin_info_set_sink(client->plugin_info, sink);
		osync_objtype_sink_connect(sink, client->plugin_data, client->plugin_info, context);

		osync_context_unref(context);
	}

	osync_trace(TRACE_EXIT, "%s", __func__);
	return TRUE;

error_free_reply:
	osync_message_unref(reply);
error:
	osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
	return FALSE;
}

static osync_bool _osync_client_handle_disconnect(OSyncClient *client, OSyncMessage *message, OSyncError **error)
{
	osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, message, error);

	char *objtype = NULL;
	OSyncMessage *reply = NULL;
	
	osync_message_read_string(message, &objtype);
	osync_trace(TRACE_INTERNAL, "Searching sink for %s", objtype);
	
	OSyncObjTypeSink *sink = NULL;
	if (objtype) {
		sink = osync_plugin_info_find_objtype(client->plugin_info, objtype);
		
		if (!sink) {
			osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to find sink for %s", objtype);
			g_free(objtype);
			goto error;
		}
		
		g_free(objtype);
	} else
		sink = osync_plugin_info_get_main_sink(client->plugin_info);
		
	if (!sink) {
		reply = osync_message_new_reply(message, error);
		if (!reply)
			goto error;
		
		if (!osync_queue_send_message(client->outgoing, NULL, reply, error))
			goto error_free_reply;
		
		osync_message_unref(reply);
	} else {
		OSyncContext *context = _create_context(client, message, _osync_client_disconnect_callback, NULL, error);
		if (!context)
			goto error;
		
		osync_plugin_info_set_sink(client->plugin_info, sink);
		osync_objtype_sink_disconnect(sink, client->plugin_data, client->plugin_info, context);
	
		osync_context_unref(context);
	}
	osync_trace(TRACE_EXIT, "%s", __func__);
	return TRUE;

error_free_reply:
	osync_message_unref(reply);
error:
	osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
	return FALSE;
}

static osync_bool _osync_client_handle_get_changes(OSyncClient *client, OSyncMessage *message, OSyncError **error)
{
	osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, message, error);

	char *objtype = NULL;
	OSyncMessage *reply = NULL;
	
	osync_message_read_string(message, &objtype);
	osync_trace(TRACE_INTERNAL, "Searching sink for %s", objtype);
	
	OSyncObjTypeSink *sink = NULL;
	if (objtype) {
		sink = osync_plugin_info_find_objtype(client->plugin_info, objtype);
		
		if (!sink) {
			osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to find sink for %s", objtype);
			g_free(objtype);
			goto error;
		}
		
		g_free(objtype);
	} else
		sink = osync_plugin_info_get_main_sink(client->plugin_info);
		
	if (!sink) {
		reply = osync_message_new_reply(message, error);
		if (!reply)
			goto error;
		
		if (!osync_queue_send_message(client->outgoing, NULL, reply, error))
			goto error_free_reply;
		
		osync_message_unref(reply);
	} else {
		OSyncContext *context = _create_context(client, message, _osync_client_get_changes_callback, NULL, error);
		if (!context)
			goto error;
		osync_context_set_changes_callback(context, _osync_client_change_callback);
		
		osync_plugin_info_set_sink(client->plugin_info, sink);

		osync_objtype_sink_get_changes(sink, client->plugin_data, client->plugin_info, context);
	
		osync_context_unref(context);
	}
	osync_trace(TRACE_EXIT, "%s", __func__);
	return TRUE;

error_free_reply:
	osync_message_unref(reply);
error:
	osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
	return FALSE;
}

static osync_bool _osync_client_handle_read_change(OSyncClient *client, OSyncMessage *message, OSyncError **error)
{
	osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, message, error);

	const char *objtype = NULL;
	OSyncMessage *reply = NULL;
	
	OSyncChange *change = NULL;
	
	if (!osync_demarshal_change(message, &change, client->format_env, error))
		goto error;
		
	osync_trace(TRACE_INTERNAL, "Change %p", change);

	objtype = osync_data_get_objtype(osync_change_get_data(change));

	OSyncObjTypeSink *sink = NULL;
	if (objtype) {
		sink = osync_plugin_info_find_objtype(client->plugin_info, objtype);
		
		if (!sink) {
			osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to find sink for %s", objtype);
			goto error;
		}
	} else {
		sink = osync_plugin_info_get_main_sink(client->plugin_info);
	}
		
	if (!sink) {
		reply = osync_message_new_reply(message, error);
		if (!reply)
			goto error;
		
		if (!osync_queue_send_message(client->outgoing, NULL, reply, error))
			goto error_free_reply;
		
		osync_message_unref(reply);
	} else {
		OSyncContext *context = _create_context(client, message, _osync_client_read_callback, change, error);
		if (!context)
			goto error;
		
		osync_plugin_info_set_sink(client->plugin_info, sink);

		osync_objtype_sink_read_change(sink, client->plugin_data, client->plugin_info, change, context);
	
		osync_context_unref(context);
	}

	osync_trace(TRACE_EXIT, "%s", __func__);
	return TRUE;

error_free_reply:
	osync_message_unref(reply);
error:
	osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
	return FALSE;
}


static osync_bool _osync_client_handle_commit_change(OSyncClient *client, OSyncMessage *message, OSyncError **error)
{
	osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, message, error);

	OSyncChange *change = NULL;
	
	if (!osync_demarshal_change(message, &change, client->format_env, error))
		goto error;
		
	osync_trace(TRACE_INTERNAL, "Change %p", change);
	
	OSyncData *data = osync_change_get_data(change);
	
	osync_trace(TRACE_INTERNAL, "Searching sink for %s", osync_data_get_objtype(data));
	
	OSyncObjTypeSink *sink = NULL;
	sink = osync_plugin_info_find_objtype(client->plugin_info, osync_data_get_objtype(data));
	
	if (!sink) {
		osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to find sink for %s", osync_data_get_objtype(data));
		osync_change_unref(change);
		goto error;
	}
		
	OSyncContext *context = _create_context(client, message, _osync_client_commit_change_callback, change, error);
	if (!context)
		goto error;
		
	osync_plugin_info_set_sink(client->plugin_info, sink);
	osync_objtype_sink_commit_change(sink, client->plugin_data, client->plugin_info, change, context);
	
	osync_change_unref(change);
	osync_context_unref(context);

	osync_trace(TRACE_EXIT, "%s", __func__);
	return TRUE;

error:
	osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
	return FALSE;
}

static osync_bool _osync_client_handle_committed_all(OSyncClient *client, OSyncMessage *message, OSyncError **error)
{
	osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, message, error);

	char *objtype = NULL;
	OSyncMessage *reply = NULL;
	
	osync_message_read_string(message, &objtype);
	osync_trace(TRACE_INTERNAL, "Searching sink for %s", objtype);
	
	OSyncObjTypeSink *sink = NULL;
	if (objtype) {
		sink = osync_plugin_info_find_objtype(client->plugin_info, objtype);
		
		if (!sink) {
			osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to find sink for %s", objtype);
			g_free(objtype);
			goto error;
		}
		
		g_free(objtype);
	} else
		sink = osync_plugin_info_get_main_sink(client->plugin_info);
		
	if (!sink) {
		reply = osync_message_new_reply(message, error);
		if (!reply)
			goto error;
		
		if (!osync_queue_send_message(client->outgoing, NULL, reply, error))
			goto error_free_reply;
		
		osync_message_unref(reply);
	} else {
		OSyncContext *context = _create_context(client, message, _osync_client_committed_all_callback, NULL, error);
		if (!context)
			goto error;
		
		osync_plugin_info_set_sink(client->plugin_info, sink);
		osync_objtype_sink_committed_all(sink, client->plugin_data, client->plugin_info, context);
	
		osync_context_unref(context);
	}
	osync_trace(TRACE_EXIT, "%s", __func__);
	return TRUE;

error_free_reply:
	osync_message_unref(reply);
error:
	osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
	return FALSE;
}

static osync_bool _osync_client_handle_sync_done(OSyncClient *client, OSyncMessage *message, OSyncError **error)
{
	osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, message, error);

	char *objtype = NULL;
	OSyncMessage *reply = NULL;
	
	osync_message_read_string(message, &objtype);
	osync_trace(TRACE_INTERNAL, "Searching sink for %s", objtype);
	
	OSyncObjTypeSink *sink = NULL;
	if (objtype) {
		sink = osync_plugin_info_find_objtype(client->plugin_info, objtype);
		
		if (!sink) {
			osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to find sink for %s", objtype);
			g_free(objtype);
			goto error;
		}
		
		g_free(objtype);
	} else
		sink = osync_plugin_info_get_main_sink(client->plugin_info);
		
	if (!sink) {
		reply = osync_message_new_reply(message, error);
		if (!reply)
			goto error;
		
		if (!osync_queue_send_message(client->outgoing, NULL, reply, error))
			goto error_free_reply;
		
		osync_message_unref(reply);
	} else {
		OSyncContext *context = _create_context(client, message, _osync_client_sync_done_callback, NULL, error);
		if (!context)
			goto error;
		
		osync_plugin_info_set_sink(client->plugin_info, sink);
		osync_objtype_sink_sync_done(sink, client->plugin_data, client->plugin_info, context);
	
		osync_context_unref(context);
	}
	osync_trace(TRACE_EXIT, "%s", __func__);
	return TRUE;

error_free_reply:
	osync_message_unref(reply);
error:
	osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
	return FALSE;
}

static void _osync_client_message_handler(OSyncMessage *message, void *user_data)
{
	osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, user_data);
	OSyncClient *client = user_data;

	OSyncError *error = NULL;

	osync_trace(TRACE_INTERNAL, "plugin received command %i", osync_message_get_command(message));

	switch (osync_message_get_command(message)) {
		case OSYNC_MESSAGE_NOOP:
		case OSYNC_MESSAGE_REPLY:
	  	case OSYNC_MESSAGE_ERRORREPLY:
	  	case OSYNC_MESSAGE_NEW_CHANGE:
	  	case OSYNC_MESSAGE_SYNCHRONIZE:
		case OSYNC_MESSAGE_ENGINE_CHANGED:
		case OSYNC_MESSAGE_MAPPING_CHANGED:
		case OSYNC_MESSAGE_MAPPINGENTRY_CHANGED:
			//Ignore these. They dont have any meaning to the client
			break;
		case OSYNC_MESSAGE_QUEUE_ERROR:
		case OSYNC_MESSAGE_ERROR:
		case OSYNC_MESSAGE_QUEUE_HUP:
			/* Handle disconnect here */
			break;
			
		case OSYNC_MESSAGE_INITIALIZE:
			if (!_osync_client_handle_initialize(client, message, &error))
				goto error;
			break;
			
		case OSYNC_MESSAGE_FINALIZE:
			if (!_osync_client_handle_finalize(client, message, &error))
				goto error;
			break;
			
		case OSYNC_MESSAGE_DISCOVER:
			if (!_osync_client_handle_discover(client, message, &error))
				goto error;
			break;
			
		case OSYNC_MESSAGE_CONNECT:
			if (!_osync_client_handle_connect(client, message, &error))
				goto error;
			break;
	
		case OSYNC_MESSAGE_DISCONNECT:
			if (!_osync_client_handle_disconnect(client, message, &error))
				goto error;
			break;
			
		case OSYNC_MESSAGE_GET_CHANGES:
			if (!_osync_client_handle_get_changes(client, message, &error))
				goto error;
			break;
	
		case OSYNC_MESSAGE_COMMIT_CHANGE:
			if (!_osync_client_handle_commit_change(client, message, &error))
				goto error;
			break;
			
		case OSYNC_MESSAGE_SYNC_DONE:
			if (!_osync_client_handle_sync_done(client, message, &error))
				goto error;
			break;
			
	  	case OSYNC_MESSAGE_COMMITTED_ALL:
			if (!_osync_client_handle_committed_all(client, message, &error))
				goto error;
			break;
	
		case OSYNC_MESSAGE_READ_CHANGE:
			if (!_osync_client_handle_read_change(client, message, &error))
				goto error;
			break;
	
		case OSYNC_MESSAGE_CALL_PLUGIN:
			/*
			char *function = itm_message_get_data(message, "function");
			void *data = itm_message_get_data(message, "data");
			OSyncError *error = NULL;
			void *replydata = osync_member_call_plugin(client->member, function, data, &error);
	
			if (itm_message_get_data(message, "want_reply")) {
				ITMessage *reply = NULL;
				if (!osync_error_is_set(&error)) {
					reply = itm_message_new_methodreply(client, message);
					itm_message_set_data(message, "reply", replydata);
				} else {
					reply = itm_message_new_errorreply(client, message);
					itm_message_set_error(reply, error);
				}
	
				itm_message_send_reply(reply);
			}
			*/
			break;
	}

	osync_trace(TRACE_EXIT, "%s", __func__);
	return;

error:;
	if (!client->outgoing) {
		client->thread = NULL;
		osync_client_shutdown(client);
		osync_trace(TRACE_EXIT_ERROR, "%s: Unable to notify parent. no outgoing queue: %s", __func__, osync_error_print(&error));
		osync_error_unref(&error);
		return;
	}

	OSyncError *locerror = NULL;
	OSyncMessage *errorreply = osync_message_new_errorreply(message, error, &locerror);
	if (!errorreply) {
		osync_client_error_shutdown(client, locerror);
		osync_error_unref(&error);
		osync_trace(TRACE_EXIT_ERROR, "%s: Error while sending error: %s", __func__, osync_error_print(&locerror));
		osync_error_unref(&locerror);
		return;
	}

	if (!osync_queue_send_message(client->outgoing, NULL, errorreply, &locerror)) {
		osync_client_error_shutdown(client, locerror);
		osync_error_unref(&error);
		osync_trace(TRACE_EXIT_ERROR, "%s: Error while sending error: %s", __func__, osync_error_print(&locerror));
		osync_error_unref(&locerror);
		return;
	}

	osync_message_unref(errorreply);

	osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(&error));
	osync_error_unref(&error);
}

/** This function takes care of the messages received on the outgoing (sending)
 * queue. The only messages we can receive there, are HUPs or ERRORs. */
static void _osync_client_hup_handler(OSyncMessage *message, void *user_data)
{
	OSyncClient *client = user_data;
	OSyncError *error = NULL;
	osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, user_data);

	osync_trace(TRACE_INTERNAL, "plugin received command %i on sending queue", osync_message_get_command(message));

	if (osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_ERROR) {
		/* Houston, we have a problem */
	} else if (osync_message_get_command(message) == OSYNC_MESSAGE_QUEUE_HUP) {
		/* The remote side disconnected. So we can now disconnect as well and then
		 * shutdown */
		if (!osync_queue_disconnect(client->outgoing, &error))
			osync_error_unref(&error);
		
		if (!osync_queue_disconnect(client->incoming, &error))
			osync_error_unref(&error);
		
		if (client->syncloop) {
			g_main_loop_quit(client->syncloop);
		}
	} else {
		/* This should never ever happen */
		osync_trace(TRACE_ERROR, "received neither a hup, nor a error on a sending queue...");
	}

	osync_trace(TRACE_EXIT, "%s", __func__);
	return;
}

OSyncClient *osync_client_new(OSyncError **error)
{
	osync_trace(TRACE_ENTRY, "%s(%p)", __func__, error);
	
	OSyncClient *client = osync_try_malloc0(sizeof(OSyncClient), error);
	if (!client) {
		osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
		return NULL;
	}
	
	client->ref_count = 1;
	client->context = g_main_context_new();
	
	osync_trace(TRACE_EXIT, "%s: %p", __func__, client);
	return client;
}

void osync_client_ref(OSyncClient *client)
{
	osync_assert(client);
	
	g_atomic_int_inc(&(client->ref_count));
}

void osync_client_unref(OSyncClient *client)
{
	osync_assert(client);
	
	if (g_atomic_int_dec_and_test(&(client->ref_count))) {
		osync_trace(TRACE_ENTRY, "%s(%p)", __func__, client);
		
		if (client->incoming) {
			if (osync_queue_is_connected(client->incoming))
				osync_queue_disconnect(client->incoming, NULL);
			osync_queue_remove(client->incoming, NULL);
			osync_queue_free(client->incoming);
		}
	
		if (client->outgoing) {
			if (osync_queue_is_connected(client->outgoing))
				osync_queue_disconnect(client->outgoing, NULL);
			osync_queue_free(client->outgoing);
		}
	
		if (client->plugin)
			osync_plugin_unref(client->plugin);
		
		if (client->thread)
			osync_thread_free(client->thread);
		
		g_free(client);
		
		osync_trace(TRACE_EXIT, "%s", __func__);
	}
}

void osync_client_set_incoming_queue(OSyncClient *client, OSyncQueue *incoming)
{
	osync_queue_set_message_handler(incoming, _osync_client_message_handler, client);
	osync_queue_setup_with_gmainloop(incoming, client->context);
	client->incoming = incoming;
}

void osync_client_set_outgoing_queue(OSyncClient *client, OSyncQueue *outgoing)
{
	osync_queue_set_message_handler(outgoing, _osync_client_hup_handler, client);
	osync_queue_setup_with_gmainloop(outgoing, client->context);
	client->outgoing = outgoing;
}

void osync_client_run_and_block(OSyncClient *client)
{
	client->syncloop = g_main_loop_new(client->context, TRUE);
	g_main_loop_run(client->syncloop);
}

osync_bool osync_client_run(OSyncClient *client, OSyncError **error)
{
	client->thread = osync_thread_new(client->context, error);
	if (!client->thread)
		return FALSE;
		
	osync_thread_start(client->thread);
	
	return TRUE;
}

static gboolean osyncClientConnectCallback(gpointer data)
{
	OSyncClient *client = data;
	osync_trace(TRACE_INTERNAL, "About to connect to the incoming queue");
	
	/* We now connect to our incoming queue */
	osync_queue_connect(client->incoming, OSYNC_QUEUE_RECEIVER, NULL);
	
	return FALSE;
}


osync_bool osync_client_run_external(OSyncClient *client, char *pipe_path, OSyncPlugin *plugin, OSyncError **error)
{
	osync_trace(TRACE_ENTRY, "%s(%p, %s, %p, %p)", __func__, client, pipe_path, plugin, error);
	/* Create connection pipes **/
	OSyncQueue *incoming = osync_queue_new(pipe_path, error);
	if (!incoming)
		goto error;
	
	if (!osync_queue_create(incoming, error))
		goto error_free_queue;
	
	osync_client_set_incoming_queue(client, incoming);
	
	client->thread = osync_thread_new(client->context, error);
	if (!client->thread)
		goto error_remove_queue;
	
	osync_thread_start(client->thread);
	
	client->plugin = plugin;
	osync_plugin_ref(client->plugin);
	
	GSource *source = NULL;
	
	source = g_idle_source_new();
	g_source_set_callback(source, osyncClientConnectCallback, client, NULL);
	g_source_attach(source, client->context);
	
	osync_trace(TRACE_EXIT, "%s", __func__);
	return TRUE;

error_remove_queue:
	osync_queue_remove(incoming, NULL);
error_free_queue:
	osync_queue_free(incoming);
error:
	osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
	return FALSE;
}

static gboolean osyncClientDisconnectCallback(gpointer data)
{
	OSyncClient *client = data;
	
	/* First, we disconnect our incoming queue. This will generate a HUP on the remote
	 * side. We dont disconnect our outgoing queue yet, since we have to make sure that
	 * all data is read. Only the listener should disconnect a pipe! */
	osync_queue_disconnect(client->incoming, NULL);

	if (client->outgoing) {
		/* We now wait until the other side disconnect our outgoing queue */
		while (osync_queue_is_connected(client->outgoing)) { g_usleep(100); }
		
		/* Now we can safely disconnect our outgoing queue */
		osync_queue_disconnect(client->outgoing, NULL);
	}
	
	return FALSE;
}

void osync_client_disconnect(OSyncClient *client)
{
	GSource *source = NULL;
	osync_trace(TRACE_ENTRY, "%s(%p)", __func__, client);
	
	source = g_idle_source_new();
	g_source_set_callback(source, osyncClientDisconnectCallback, client, NULL);
	g_source_attach(source, client->context);
	
	g_source_unref(source);
			
	osync_trace(TRACE_EXIT, "%s", __func__);
}

void osync_client_shutdown(OSyncClient *client)
{
	osync_trace(TRACE_ENTRY, "%s(%p)", __func__, client);
	osync_assert(client);
	
	osync_client_disconnect(client);
			
	if (client->syncloop) {
		if (g_main_loop_is_running(client->syncloop)) {

			/* now we can quit the main loop */
			g_main_loop_quit(client->syncloop);
		}
		
		g_main_loop_unref(client->syncloop);
		client->syncloop = NULL;
	} else if (client->thread) {
		osync_thread_stop(client->thread);
		osync_thread_free(client->thread);
		client->thread = NULL;
	}
	
	osync_trace(TRACE_EXIT, "%s", __func__);
}

void osync_client_error_shutdown(OSyncClient *client, OSyncError *error)
{
	osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, client, error);

	OSyncMessage *message = osync_message_new_error(error, NULL);
	if (message)
		osync_queue_send_message(client->outgoing, NULL, message, NULL);

	osync_client_shutdown(client);

	osync_trace(TRACE_EXIT, "%s", __func__);
}

/*void *osync_client_message_sink(OSyncMember *member, const char *name, void *data, osync_bool synchronous)
{
	OSyncClient *client = osync_member_get_data(member);
	OSyncEngine *engine = client->engine;
	if (!synchronous) {

		ITMessage *message = itm_message_new_signal(client, "PLUGIN_MESSAGE");
		osync_debug("CLI", 3, "Sending message %p PLUGIN_MESSAGE for message %s", message, name);
		itm_message_set_data(message, "data", data);
		itm_message_set_data(message, "name", g_strdup(name));
		itm_queue_send(engine->incoming, message);

		return NULL;
	} else {
		return engine->plgmsg_callback(engine, client, name, data, engine->plgmsg_userdata);
	}

  return NULL;
}*/
